欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

java基于事件驱动的单线程异步框架,java单线程异步框架,Node.Js的基于事件

来源: javaer 分享于  点击 16235 次 点评:64

java基于事件驱动的单线程异步框架,java单线程异步框架,Node.Js的基于事件


Node.Js的基于事件驱动的单线程异步编程模型让我印象深刻,其实任何一种语言都能实现这个模型。下面我就试着用Java语言来实现一个简单的事件驱动的单线程异步框架。设计思想很简单。只要设置一个存放任务的阻塞队列,让一个工作线程每次从该队列中取出一个任务并执行,完毕再取下一个任务。由于单线程多任务,因此每个任务的粒度要充分小,否则一个大任务会阻塞其他任务的执行。解决的办法是将一个大任务切分成很多小任务,每个小任务能快速执行。(其实和CPU时间分片的原理一样)由于在运行过程中会创建大量瞬时(instant)的任务对象,可以预见在内存空间不充裕的情况下GC会比较频繁。 (当然这和并发量也有关系,但即使只实现一个普通的计算需求,比如计算一个级数的前100万项,也会产生至少100万个计算任务对象)代码中给出了三个例子,一个是利用级数求PI的近似值,一个是读一个文本文件并统计每行文本的长度和单词数,最后一个例子是将前两个例子合并起来,观察单线程异步的并发效果.

EventEmitter.java

package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午10:53 * To change this template use File | Settings | File Templates. *//** * 一个EventEmitter能为某个事件注册监听器,或发出某个事件的通知 */public interface EventEmitter {    /**     * 为事件注册监听器     * @param eventName 事件名     * @param handler     */    public void on(String eventName, EventHandler handler);    /**     * 发出某个事件的通知     * @param eventName 事件名     * @param args     */    public void emit(String eventName, Object ... args);    /**     * 移除该事件的所有监听器     * @param eventName     */    public void remove(String eventName);}

EventHandler.java

package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:30 * To change this template use File | Settings | File Templates. */public interface EventHandler {     public void handle(EventObject event);}

EventObject.java

package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:32 * To change this template use File | Settings | File Templates. *//** * 表示一个事件 */public class EventObject {    final private String eventName; // 事件名    final private Object source;  // 事件源    final private Object args[];  // 可选参数    public EventObject(String eventName, Object source) {        this(eventName, source, null);    }    public EventObject(String eventName, Object source, Object[] args) {        this.eventName = eventName;        this.source = source;        this.args = args;    }    public String getEventName() {        return eventName;    }    public Object getSource() {        return source;    }    public Object[] getArgs() {        return args;    }}

Task.java

package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:30 * To change this template use File | Settings | File Templates. *//** * 表示一个任务 */public interface Task {    /**     * 执行一个具体任务     */    public void execute();}

TaskController.java

package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午1:48 * To change this template use File | Settings | File Templates. *//** * 任务控制器 */public interface TaskController {    /**     * 启动任务管理器     */    public void start();    /**     * 停止任务管理器。当调用这个方法后,任务管理器不再接收新提交的任务,但会继续执行已提交的任务     */    public void stop();    /**     * 立即关闭任务管理,已提交且未开始执行的任务将会被丢弃     */    public void shutdown();    /**     * 任务管理器是否停止     * @return     */    public boolean isStop();    /**     * 任务管理器     * @return     */    public boolean isShutdown();    public boolean isRunning();}

TaskEventEmitter.java

package event;import java.io.IOException;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午11:34 * To change this template use File | Settings | File Templates. */abstract public class TaskEventEmitter implements Task, EventEmitter {    private Map<String, List<EventHandler>> eventHandlerMap = new HashMap<String, List<EventHandler>>();    final private TaskExecutor executor;    public TaskEventEmitter(TaskExecutor executor) {        this.executor = executor;        addDefaultExceptionHandlers();    }    protected void addDefaultExceptionHandlers() {        EventHandler eh = new EventHandler() {            @Override            public void handle(EventObject event) {                ((Exception) event.getSource()).printStackTrace();            }        };        on(Exception.class.getName(), eh);        on(IOException.class.getName(), eh);        on(RuntimeException.class.getName(), eh);        on(NullPointerException.class.getName(), eh);        on(IndexOutOfBoundsException.class.getName(), eh);    }    @Override    public void on(String eventName, EventHandler handler) {        if (!eventHandlerMap.containsKey(eventName)) {            List<EventHandler> eventHandlerList = new LinkedList<EventHandler>();            eventHandlerMap.put(eventName, eventHandlerList);        }        eventHandlerMap.get(eventName).add(handler);    }    @Override    public void remove(String eventName) {        eventHandlerMap.remove(eventName);    }    @Override    public void emit(final String eventName, final Object... args) {        if (eventHandlerMap.containsKey(eventName)) {            List<EventHandler> eventHandlerList = eventHandlerMap.get(eventName);            for (final EventHandler handler : eventHandlerList) {                executor.submit(new Task() {                    @Override                    public void execute() {                        handler.handle(new EventObject(eventName, TaskEventEmitter.this, args));                    }                });            }        } else {            System.out.println("No event handler listen this event: " + eventName);            if (args[0] instanceof Exception) {                ((Exception) args[0]).printStackTrace();            }        }    }    @Override    public void execute() {        try {            run();        } catch (Exception e) {            emit(e.getClass().getName(), e);        }    }    abstract protected void run() throws Exception;}

TaskExecutor.java

package event;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:37 * To change this template use File | Settings | File Templates. *//** * 任务执行器 */ public interface TaskExecutor extends Task {    /**     * 提交一个任务     * @param task     */    public void submit(Task task);}

TaskManager.java

package event;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.LinkedBlockingQueue;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午1:47 * To change this template use File | Settings | File Templates. *//** * 表示一个任务管理器 */public class TaskManager implements TaskController {    private TaskExecutor executor;  // 任务执行器    private Thread executorThread;  // 用于执行任务的工作线程    private int lenOfTaskQueue = Integer.MAX_VALUE;   // 任务队列的长度,默认为无限大    public TaskManager() {    }    public TaskManager(int lenOfTaskQueue) {        this.lenOfTaskQueue = lenOfTaskQueue;    }    public TaskManager(TaskExecutor executor) {        this.executor = executor;    }    public TaskExecutor getExecutor() {        if (executor == null){            init();        }        return executor;    }    private class DefaultTaskExecutor implements TaskExecutor {        final private BlockingQueue<Task> taskQueue; // 任务队列,所有待执行的任务都会被放入这个队列,等待执行        public DefaultTaskExecutor() {            this(Integer.MAX_VALUE);        }        public DefaultTaskExecutor(int len) {            taskQueue = new LinkedBlockingQueue<Task>(len);        }        @Override        public void submit(Task task) {            if (!isStop) {                try {                    taskQueue.put(task);                } catch (InterruptedException e) {                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.                }            } else {                throw new IllegalStateException("This executor is not alive.");            }        }        @Override        public void execute() {            isRunning = true;            taskQueue.clear();            System.out.println("Start to process task ... ");            while (!isShutdown && (!isStop || taskQueue.size() > 0)) {                try {                    Task task = taskQueue.take();//                    System.out.println("Tak a task to execute , now the length of  task queue is " + taskQueue.size());                    task.execute();                } catch (InterruptedException e) {                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.                }            }            System.out.println("Stop to process task ... ");            isRunning = false;        }    }    private boolean isStop = true;  // 标识执行器是否接收新的任务    private boolean isShutdown = true; // 标识执行器是否立即结束    private boolean isRunning = false;  // 标识执行器是否正在运行    public void start() {        if (executor == null) {            init();        }        if (isStop && !isRunning) {            isStop = false;            isShutdown = false;            executorThread = new Thread() {                public void run() {                    executor.execute();                }            };            executorThread.start();        } else {            throw new IllegalStateException("This executor is still alive or running, please stop or shutdown immediately and try it again later");        }    }    /**     * 初始化一个任务执行器     */    private void init() {        executor = new DefaultTaskExecutor(lenOfTaskQueue);    }    public void stop() {        isStop = true;        executorThread.interrupt();    }    public void shutdown() {        isStop = true;        isShutdown = true;        executorThread.interrupt();    }    @Override    public boolean isStop() {        return this.isStop;    }    @Override    public boolean isShutdown() {        return this.isShutdown;    }    @Override    public boolean isRunning() {        return this.isRunning;    }}

IODemo.java

package event;import java.io.*;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午3:54 * To change this template use File | Settings | File Templates. */public class IODemo {    public static void main(String[] args) throws InterruptedException {        String fileName = "info.txt";        final TaskManager manager = new TaskManager();        manager.start();        TaskExecutor executor = manager.getExecutor();        Task ioTask = TaskHelper.createIOTask(executor, fileName);        executor.submit(ioTask);        Thread.sleep(5000L);        manager.stop();    }}class IOTask extends TaskEventEmitter {    final private String fileName;    final private String encoding;    public IOTask(TaskExecutor executor, String fileName, String encoding) {        super(executor);        this.fileName = fileName;        this.encoding = encoding;    }    public String getFileName() {        return fileName;    }    public String getEncoding() {        return encoding;    }    @Override    protected void run() throws Exception {        InputStream fis = this.getClass().getResourceAsStream("/" + fileName);        if (fis != null) {            BufferedReader reader = new BufferedReader(new InputStreamReader(fis, encoding));            emit("open", getFileName());            emit("next", reader);        }else{            throw new FileNotFoundException(fileName);        }    }}

PICalcDemo.java

package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:01 * To change this template use File | Settings | File Templates. * π/4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - 1/11 + … + (-1)^(n-1)/(2*n-1) */public class PICalcDemo {    public static void main(String[] args) {        final TaskManager manager = new TaskManager();        manager.start();        TaskExecutor executor = manager.getExecutor();         Task piTask = TaskHelper.createPiTask(executor, 10000);        executor.submit(piTask);    }}class PICalcTask extends TaskEventEmitter {    private final int N;    PICalcTask(TaskExecutor executor, int n) {        super(executor);        if (n < 1) throw new IllegalArgumentException("n must be larger than 0");        this.N = n;    }    public int getN() {        return N;    }    @Override    protected void run() throws Exception {        emit("next", 1);    }}

CompoundTaskDemo.java

package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:27 * To change this template use File | Settings | File Templates. */public class CompoundTaskDemo {    public static void main(String[] args) {        TaskManager manager = new TaskManager();        TaskExecutor executor = manager.getExecutor();        manager.start();        TaskEventEmitter ioTask = TaskHelper.createIOTask(executor, "info.txt");        TaskEventEmitter piTask = TaskHelper.createPiTask(executor, 100);        final TaskEventEmitter guardTask = new GuardTask(manager, 2);        EventHandler handler = new EventHandler() {            @Override            public void handle(EventObject event) {                guardTask.emit("end");            }        };        ioTask.on("close",handler);        piTask.on("finish", handler);        executor.submit(ioTask);        executor.submit(piTask);        executor.submit(guardTask);    }}

TaskHelper.java

package event;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.IOException;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:28 * To change this template use File | Settings | File Templates. */public class TaskHelper {    public static TaskEventEmitter createIOTask(TaskExecutor executor, String fileName){        final IOTask task = new IOTask(executor, fileName, "UTF-8");        task.on("open", new EventHandler() {            @Override            public void handle(EventObject event) {                String fileName = (String) event.getArgs()[0];                System.out.println(Thread.currentThread() + " - " + fileName + " has been opened.");            }        });        task.on("next", new EventHandler() {            @Override            public void handle(EventObject event) {                BufferedReader reader = (BufferedReader) event.getArgs()[0];                try {                    String line = reader.readLine();                    if (line != null) {                        task.emit("ready", line);                        task.emit("next", reader);                    } else {                        task.emit("close", task.getFileName());                    }                } catch (IOException e) {                    task.emit(e.getClass().getName(), e, task.getFileName());                    try {                        reader.close();                        task.emit("close", task.getFileName());                    } catch (IOException e1) {                        e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.                    }                }            }        });        task.on("ready", new EventHandler() {            @Override            public void handle(EventObject event) {                String line = (String) event.getArgs()[0];                int len = line.length();                int wordCount = line.split("[\\s+,.]+").length;                System.out.println(Thread.currentThread()+" - word count: "+wordCount+" length: "+len);            }        });        task.on(IOException.class.getName(), new EventHandler() {            @Override            public void handle(EventObject event) {                Object[] args = event.getArgs();                IOException e = (IOException) args[0];                String fileName = (String) args[1];                System.out.println(Thread.currentThread()+ " - An IOException occurred while reading " + fileName + ", error: " + e.getMessage());            }        });        task.on("close", new EventHandler() {            @Override            public void handle(EventObject event) {                String fileName = (String) event.getArgs()[0];                System.out.println(Thread.currentThread() + " - " + fileName + " has been closed.");            }        });        task.on(FileNotFoundException.class.getName(), new EventHandler() {            @Override            public void handle(EventObject event) {                FileNotFoundException e = (FileNotFoundException) event.getArgs()[0];                e.printStackTrace();                System.exit(1);            }        });        return task;    }    public static TaskEventEmitter createPiTask(TaskExecutor executor, int n) {        final PICalcTask task = new PICalcTask(executor, n);        // 计算下一个级数项        task.on("next", new EventHandler() {            @Override            public void handle(EventObject event) {                int n = ((Integer) event.getArgs()[0]).intValue();                double xn = Math.pow(-1, n - 1) / (2 * n - 1);                task.emit("sum", xn);            }        });        // 将每一个级数项加起来        task.on("sum", new EventHandler() {            private int i = 0;            private double sum = 0;            @Override            public void handle(EventObject event) {                double xn = ((Double) event.getArgs()[0]).doubleValue();                sum += xn;                i++;                System.out.println(Thread.currentThread()+" - sum = "+ sum);                if (i >= task.getN()) {                    task.emit("finish", sum * 4);                } else {                    task.emit("next", i + 1);                }            }        });        // 完成PI的近似计算        task.on("finish", new EventHandler() {            @Override            public void handle(EventObject event) {                Double sum = (Double) event.getArgs()[0];                System.out.println("pi=" + sum);            }        });        return task;    }}

GuardTask.java

package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午6:35 * To change this template use File | Settings | File Templates. *//** * 守卫任务类 * <p></p> * 当所有具体任务都执行完毕,通知任务管理器关闭 */public class GuardTask extends TaskEventEmitter {    private final int N;    public GuardTask(final TaskManager manager, int n) {        super(manager.getExecutor());        this.N = n;        on("end", new EventHandler() {            private  int i=0;            @Override            public void handle(EventObject event) {                i++;                if (i >= N){                    manager.stop();                }            }        });    }    @Override    protected void  run() throws Exception {    }}
相关栏目:

用户点评