欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

java nginx监控服务程序调度算法实现,javanginx,package com.

来源: javaer 分享于  点击 33306 次 点评:98

java nginx监控服务程序调度算法实现,javanginx,package com.


package com.wole.monitor;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicLong;import org.eclipse.jetty.util.ConcurrentHashSet;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.wole.monitor.dao.ServiceDao;import com.wole.servicemonitor.util.ServiceUtils;/** * 管理并调度某一个服务数据源的监控池 * @author yzygenuine * */public class MonitorsManage {    private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);    private ServiceDao dao;    /**     * 执行的一个并发池     */    private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,            new SynchronousQueue<Runnable>());    /**     *      */    private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);    /**     * 正在执行中的MonitorService集合     */    private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();    /**     * 等待优先级队列     */    private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();    /**     * 执行队列     */    private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();    /**     * 是否关闭     */    private AtomicBoolean isClose = new AtomicBoolean(false);    /**     * 生产者启动时间     */    private AtomicLong startTime = new AtomicLong(0);    /**     * 相对于启动的间隔时间     */    private AtomicLong intervalTime = new AtomicLong(0);    public void close() {        logger.info("closing................");        isClose.compareAndSet(false, true);    }    public void init() {        logger.info("初始化");    }    public void work() {        logger.info("开始工作");        // 生产者启动工作        Thread productThread = new Thread(new ProductMonitor(1000));        // 消费者启动工作        Thread consumerThread = new Thread(new ConsumerMonitor(1000));        // 回收者启动工作        Thread recoverThread = new Thread(new RecoverMonitor(1000));        // 启动定时加载数据工作        Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));        productThread.start();        consumerThread.start();        recoverThread.start();        refreshThread.start();    }    /**     * 生产者     *      * @author yzygenuine     *      */    class ProductMonitor implements Runnable {        long sleepTime = 1000;        public ProductMonitor(long sleepTime) {            this.sleepTime = sleepTime;        }        @Override        public void run() {            logger.info("生产者开启工作");            // 开始进行定时监控            long now = System.currentTimeMillis();            long lastTime = now;            startTime.addAndGet(now);            try {                do {                    Thread.sleep(sleepTime);                    logger.debug("生产者休息{}ms", sleepTime);                    now = System.currentTimeMillis();                    intervalTime.addAndGet(now - lastTime);                    while (sleepQueue.size() > 0) {                        MonitorService service = sleepQueue.peek();                        if (service.getCurrentTime() - intervalTime.get() < 1) {                            service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列                            if (!currentSet.contains(service)) {                                logger.info("service {} 已被删除,不加入执行队列了", service.toString());                                continue;                            }                            executeQueue.add(service);                        } else {                            logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());                            break;                        }                    }                    if (sleepQueue.size() <= 0) {                        logger.debug("生产队列为空");                    }                    lastTime = now;                } while (!isClose.get());            } catch (Exception e) {                logger.error("", e);            }        }    }    /**     * 消费者     *      * @author yzygenuine     *      */    class ConsumerMonitor implements Runnable {        long sleepTime = 1000;        public ConsumerMonitor(long sleepTime) {            this.sleepTime = sleepTime;            if (sleepTime < 1000) {                throw new RuntimeException("请配置sleepTime值大一些");            }        }        @Override        public void run() {            logger.info("消费者开启工作");            try {                do {                    Thread.sleep(sleepTime);                    logger.debug("消费者休息{}ms", sleepTime);                    while (executeQueue.size() > 0) {                        final MonitorService service = executeQueue.poll();                        completionService.submit(new ExecuteCallable(service));                    }                    logger.debug("消费队列为空");                } while (!isClose.get());            } catch (Exception e) {                logger.error("", e);            }        }    }    /**     * 执行回调类     *      * @author yzygenuine     *      */    class ExecuteCallable implements Callable<Response> {        final MonitorService service;        public ExecuteCallable(MonitorService service) {            this.service = service;        }        @Override        public Response call() throws Exception {            logger.debug("执行");            Map<String, String> r = new HashMap<String, String>();            Response response = new Response();            response.service = service;            response.response = r;            Monitor m = MonitorFactory.getMonitor(service);            response.isNeedWarn = m.isNeedWarnging(service, r);            if (response.isNeedWarn) {                response.isSucToNotify = m.sendNotify(service, r);            }            return response;        }    }    /**     * 回收者     *      * @author yzygenuine     *      */    class RecoverMonitor implements Runnable {        private long sleepTime = 1000;        private long count = 0;        public RecoverMonitor(long sleepTime) {            this.sleepTime = sleepTime;            if (sleepTime < 1000) {                throw new RuntimeException("请配置sleepTime值大一些");            }        }        @Override        public void run() {            logger.info("回收者开启工作");            try {                do {                    // Thread.sleep(sleepTime);                    Future<Response> response = completionService.take();                    // 重置后进入休眠队列                    MonitorService s = response.get().service;                    if (!currentSet.contains(s)) {                        logger.info("service {} 已被删除,不回收了", s.toString());                        continue;                    }                    // 当前程序已运动的时间+相对间隔时间=绝对的间隔时间                    s.setCurrentTime(s.getIntervalTime() + intervalTime.get());                    sleepQueue.add(s);                    count++;                    logger.info("回收,当前回收数量:" + count);                } while (!isClose.get());            } catch (Exception e) {                logger.error("", e);            }        }    }    /**     * 加载新的数据     *      * @author yzygenuine     *      */    class RefreshMonitorService implements Runnable {        private long sleepTime = 1000;        private ServiceDao dao;        public RefreshMonitorService(long sleepTime, ServiceDao dao) {            this.sleepTime = sleepTime;            if (sleepTime < 60000) {                logger.warn("刷新加载数据的间隔时间不能太短");                throw new RuntimeException("刷新加载数据的间隔时间不能太短");            }            this.dao = dao;        }        private void firstLoad() {            List<MonitorService> monitorService = dao.getService();            logger.info("加载记录:" + monitorService.size());            // 将被监控服务加入优先级队列里            for (int j = 0; j < monitorService.size(); j++) {                MonitorService service = monitorService.get(j);                // 初始化好时间                service.setCurrentTime(service.getIntervalTime() + intervalTime.get());                currentSet.add(service);                sleepQueue.add(service);            }        }        @Override        public void run() {            logger.info("读取新的service开启工作");            firstLoad();            try {                do {                    logger.info("定时加载新的数据监听者休息{}ms", sleepTime);                    Thread.sleep(sleepTime);                    logger.info("##########开始执行更新数据############");                    // 加载新的所有所数据 ,与当前的数据比较                    List<MonitorService> deleteList = dao.deleteService();                    List<MonitorService> addList = dao.incrementalService();                    logger.info("删除旧的数据共:{}", deleteList.size());                    currentSet.removeAll(deleteList);                    logger.info("增加新的数据共:{}", addList.size());                    currentSet.addAll(addList);                    logger.info("更新后的currentSet size:{}", currentSet.size());                    for (MonitorService service : addList) {                        // 初始化绝对间隔时间                        service.setCurrentTime(service.getIntervalTime() + intervalTime.get());                        sleepQueue.add(service);                    }                    logger.info("########这一轮更新结束");                } while (!isClose.get());            } catch (Exception e) {                logger.error("", e);            }        }    }    /**     * 响应的封装类     *      * @author yzygenuine     *      */    class Response {        public Map<String, String> response;        public MonitorService service;        public boolean isNeedWarn;        public boolean isSucToNotify;    }    public void setDao(ServiceDao dao) {        this.dao = dao;    }}
相关栏目:

用户点评