java基于事件驱动的单线程异步框架,java单线程异步框架,Node.Js的基于事件
分享于 点击 16235 次 点评:64
java基于事件驱动的单线程异步框架,java单线程异步框架,Node.Js的基于事件
Node.Js的基于事件驱动的单线程异步编程模型让我印象深刻,其实任何一种语言都能实现这个模型。下面我就试着用Java语言来实现一个简单的事件驱动的单线程异步框架。设计思想很简单。只要设置一个存放任务的阻塞队列,让一个工作线程每次从该队列中取出一个任务并执行,完毕再取下一个任务。由于单线程多任务,因此每个任务的粒度要充分小,否则一个大任务会阻塞其他任务的执行。解决的办法是将一个大任务切分成很多小任务,每个小任务能快速执行。(其实和CPU时间分片的原理一样)由于在运行过程中会创建大量瞬时(instant)的任务对象,可以预见在内存空间不充裕的情况下GC会比较频繁。 (当然这和并发量也有关系,但即使只实现一个普通的计算需求,比如计算一个级数的前100万项,也会产生至少100万个计算任务对象)代码中给出了三个例子,一个是利用级数求PI的近似值,一个是读一个文本文件并统计每行文本的长度和单词数,最后一个例子是将前两个例子合并起来,观察单线程异步的并发效果.
EventEmitter.java
package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午10:53 * To change this template use File | Settings | File Templates. *//** * 一个EventEmitter能为某个事件注册监听器,或发出某个事件的通知 */public interface EventEmitter { /** * 为事件注册监听器 * @param eventName 事件名 * @param handler */ public void on(String eventName, EventHandler handler); /** * 发出某个事件的通知 * @param eventName 事件名 * @param args */ public void emit(String eventName, Object ... args); /** * 移除该事件的所有监听器 * @param eventName */ public void remove(String eventName);}
EventHandler.java
package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:30 * To change this template use File | Settings | File Templates. */public interface EventHandler { public void handle(EventObject event);}
EventObject.java
package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:32 * To change this template use File | Settings | File Templates. *//** * 表示一个事件 */public class EventObject { final private String eventName; // 事件名 final private Object source; // 事件源 final private Object args[]; // 可选参数 public EventObject(String eventName, Object source) { this(eventName, source, null); } public EventObject(String eventName, Object source, Object[] args) { this.eventName = eventName; this.source = source; this.args = args; } public String getEventName() { return eventName; } public Object getSource() { return source; } public Object[] getArgs() { return args; }}
Task.java
package event;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:30 * To change this template use File | Settings | File Templates. *//** * 表示一个任务 */public interface Task { /** * 执行一个具体任务 */ public void execute();}
TaskController.java
package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午1:48 * To change this template use File | Settings | File Templates. *//** * 任务控制器 */public interface TaskController { /** * 启动任务管理器 */ public void start(); /** * 停止任务管理器。当调用这个方法后,任务管理器不再接收新提交的任务,但会继续执行已提交的任务 */ public void stop(); /** * 立即关闭任务管理,已提交且未开始执行的任务将会被丢弃 */ public void shutdown(); /** * 任务管理器是否停止 * @return */ public boolean isStop(); /** * 任务管理器 * @return */ public boolean isShutdown(); public boolean isRunning();}
TaskEventEmitter.java
package event;import java.io.IOException;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午11:34 * To change this template use File | Settings | File Templates. */abstract public class TaskEventEmitter implements Task, EventEmitter { private Map<String, List<EventHandler>> eventHandlerMap = new HashMap<String, List<EventHandler>>(); final private TaskExecutor executor; public TaskEventEmitter(TaskExecutor executor) { this.executor = executor; addDefaultExceptionHandlers(); } protected void addDefaultExceptionHandlers() { EventHandler eh = new EventHandler() { @Override public void handle(EventObject event) { ((Exception) event.getSource()).printStackTrace(); } }; on(Exception.class.getName(), eh); on(IOException.class.getName(), eh); on(RuntimeException.class.getName(), eh); on(NullPointerException.class.getName(), eh); on(IndexOutOfBoundsException.class.getName(), eh); } @Override public void on(String eventName, EventHandler handler) { if (!eventHandlerMap.containsKey(eventName)) { List<EventHandler> eventHandlerList = new LinkedList<EventHandler>(); eventHandlerMap.put(eventName, eventHandlerList); } eventHandlerMap.get(eventName).add(handler); } @Override public void remove(String eventName) { eventHandlerMap.remove(eventName); } @Override public void emit(final String eventName, final Object... args) { if (eventHandlerMap.containsKey(eventName)) { List<EventHandler> eventHandlerList = eventHandlerMap.get(eventName); for (final EventHandler handler : eventHandlerList) { executor.submit(new Task() { @Override public void execute() { handler.handle(new EventObject(eventName, TaskEventEmitter.this, args)); } }); } } else { System.out.println("No event handler listen this event: " + eventName); if (args[0] instanceof Exception) { ((Exception) args[0]).printStackTrace(); } } } @Override public void execute() { try { run(); } catch (Exception e) { emit(e.getClass().getName(), e); } } abstract protected void run() throws Exception;}
TaskExecutor.java
package event;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;/** * User: 刘永健 * Date: 12-10-2 * Time: 下午9:37 * To change this template use File | Settings | File Templates. *//** * 任务执行器 */ public interface TaskExecutor extends Task { /** * 提交一个任务 * @param task */ public void submit(Task task);}
TaskManager.java
package event;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.LinkedBlockingQueue;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午1:47 * To change this template use File | Settings | File Templates. *//** * 表示一个任务管理器 */public class TaskManager implements TaskController { private TaskExecutor executor; // 任务执行器 private Thread executorThread; // 用于执行任务的工作线程 private int lenOfTaskQueue = Integer.MAX_VALUE; // 任务队列的长度,默认为无限大 public TaskManager() { } public TaskManager(int lenOfTaskQueue) { this.lenOfTaskQueue = lenOfTaskQueue; } public TaskManager(TaskExecutor executor) { this.executor = executor; } public TaskExecutor getExecutor() { if (executor == null){ init(); } return executor; } private class DefaultTaskExecutor implements TaskExecutor { final private BlockingQueue<Task> taskQueue; // 任务队列,所有待执行的任务都会被放入这个队列,等待执行 public DefaultTaskExecutor() { this(Integer.MAX_VALUE); } public DefaultTaskExecutor(int len) { taskQueue = new LinkedBlockingQueue<Task>(len); } @Override public void submit(Task task) { if (!isStop) { try { taskQueue.put(task); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } else { throw new IllegalStateException("This executor is not alive."); } } @Override public void execute() { isRunning = true; taskQueue.clear(); System.out.println("Start to process task ... "); while (!isShutdown && (!isStop || taskQueue.size() > 0)) { try { Task task = taskQueue.take();// System.out.println("Tak a task to execute , now the length of task queue is " + taskQueue.size()); task.execute(); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } System.out.println("Stop to process task ... "); isRunning = false; } } private boolean isStop = true; // 标识执行器是否接收新的任务 private boolean isShutdown = true; // 标识执行器是否立即结束 private boolean isRunning = false; // 标识执行器是否正在运行 public void start() { if (executor == null) { init(); } if (isStop && !isRunning) { isStop = false; isShutdown = false; executorThread = new Thread() { public void run() { executor.execute(); } }; executorThread.start(); } else { throw new IllegalStateException("This executor is still alive or running, please stop or shutdown immediately and try it again later"); } } /** * 初始化一个任务执行器 */ private void init() { executor = new DefaultTaskExecutor(lenOfTaskQueue); } public void stop() { isStop = true; executorThread.interrupt(); } public void shutdown() { isStop = true; isShutdown = true; executorThread.interrupt(); } @Override public boolean isStop() { return this.isStop; } @Override public boolean isShutdown() { return this.isShutdown; } @Override public boolean isRunning() { return this.isRunning; }}
IODemo.java
package event;import java.io.*;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午3:54 * To change this template use File | Settings | File Templates. */public class IODemo { public static void main(String[] args) throws InterruptedException { String fileName = "info.txt"; final TaskManager manager = new TaskManager(); manager.start(); TaskExecutor executor = manager.getExecutor(); Task ioTask = TaskHelper.createIOTask(executor, fileName); executor.submit(ioTask); Thread.sleep(5000L); manager.stop(); }}class IOTask extends TaskEventEmitter { final private String fileName; final private String encoding; public IOTask(TaskExecutor executor, String fileName, String encoding) { super(executor); this.fileName = fileName; this.encoding = encoding; } public String getFileName() { return fileName; } public String getEncoding() { return encoding; } @Override protected void run() throws Exception { InputStream fis = this.getClass().getResourceAsStream("/" + fileName); if (fis != null) { BufferedReader reader = new BufferedReader(new InputStreamReader(fis, encoding)); emit("open", getFileName()); emit("next", reader); }else{ throw new FileNotFoundException(fileName); } }}
PICalcDemo.java
package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:01 * To change this template use File | Settings | File Templates. * π/4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - 1/11 + … + (-1)^(n-1)/(2*n-1) */public class PICalcDemo { public static void main(String[] args) { final TaskManager manager = new TaskManager(); manager.start(); TaskExecutor executor = manager.getExecutor(); Task piTask = TaskHelper.createPiTask(executor, 10000); executor.submit(piTask); }}class PICalcTask extends TaskEventEmitter { private final int N; PICalcTask(TaskExecutor executor, int n) { super(executor); if (n < 1) throw new IllegalArgumentException("n must be larger than 0"); this.N = n; } public int getN() { return N; } @Override protected void run() throws Exception { emit("next", 1); }}
CompoundTaskDemo.java
package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:27 * To change this template use File | Settings | File Templates. */public class CompoundTaskDemo { public static void main(String[] args) { TaskManager manager = new TaskManager(); TaskExecutor executor = manager.getExecutor(); manager.start(); TaskEventEmitter ioTask = TaskHelper.createIOTask(executor, "info.txt"); TaskEventEmitter piTask = TaskHelper.createPiTask(executor, 100); final TaskEventEmitter guardTask = new GuardTask(manager, 2); EventHandler handler = new EventHandler() { @Override public void handle(EventObject event) { guardTask.emit("end"); } }; ioTask.on("close",handler); piTask.on("finish", handler); executor.submit(ioTask); executor.submit(piTask); executor.submit(guardTask); }}
TaskHelper.java
package event;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.IOException;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午5:28 * To change this template use File | Settings | File Templates. */public class TaskHelper { public static TaskEventEmitter createIOTask(TaskExecutor executor, String fileName){ final IOTask task = new IOTask(executor, fileName, "UTF-8"); task.on("open", new EventHandler() { @Override public void handle(EventObject event) { String fileName = (String) event.getArgs()[0]; System.out.println(Thread.currentThread() + " - " + fileName + " has been opened."); } }); task.on("next", new EventHandler() { @Override public void handle(EventObject event) { BufferedReader reader = (BufferedReader) event.getArgs()[0]; try { String line = reader.readLine(); if (line != null) { task.emit("ready", line); task.emit("next", reader); } else { task.emit("close", task.getFileName()); } } catch (IOException e) { task.emit(e.getClass().getName(), e, task.getFileName()); try { reader.close(); task.emit("close", task.getFileName()); } catch (IOException e1) { e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } }); task.on("ready", new EventHandler() { @Override public void handle(EventObject event) { String line = (String) event.getArgs()[0]; int len = line.length(); int wordCount = line.split("[\\s+,.]+").length; System.out.println(Thread.currentThread()+" - word count: "+wordCount+" length: "+len); } }); task.on(IOException.class.getName(), new EventHandler() { @Override public void handle(EventObject event) { Object[] args = event.getArgs(); IOException e = (IOException) args[0]; String fileName = (String) args[1]; System.out.println(Thread.currentThread()+ " - An IOException occurred while reading " + fileName + ", error: " + e.getMessage()); } }); task.on("close", new EventHandler() { @Override public void handle(EventObject event) { String fileName = (String) event.getArgs()[0]; System.out.println(Thread.currentThread() + " - " + fileName + " has been closed."); } }); task.on(FileNotFoundException.class.getName(), new EventHandler() { @Override public void handle(EventObject event) { FileNotFoundException e = (FileNotFoundException) event.getArgs()[0]; e.printStackTrace(); System.exit(1); } }); return task; } public static TaskEventEmitter createPiTask(TaskExecutor executor, int n) { final PICalcTask task = new PICalcTask(executor, n); // 计算下一个级数项 task.on("next", new EventHandler() { @Override public void handle(EventObject event) { int n = ((Integer) event.getArgs()[0]).intValue(); double xn = Math.pow(-1, n - 1) / (2 * n - 1); task.emit("sum", xn); } }); // 将每一个级数项加起来 task.on("sum", new EventHandler() { private int i = 0; private double sum = 0; @Override public void handle(EventObject event) { double xn = ((Double) event.getArgs()[0]).doubleValue(); sum += xn; i++; System.out.println(Thread.currentThread()+" - sum = "+ sum); if (i >= task.getN()) { task.emit("finish", sum * 4); } else { task.emit("next", i + 1); } } }); // 完成PI的近似计算 task.on("finish", new EventHandler() { @Override public void handle(EventObject event) { Double sum = (Double) event.getArgs()[0]; System.out.println("pi=" + sum); } }); return task; }}
GuardTask.java
package event;/** * User: 刘永健 * Date: 12-10-3 * Time: 下午6:35 * To change this template use File | Settings | File Templates. *//** * 守卫任务类 * <p></p> * 当所有具体任务都执行完毕,通知任务管理器关闭 */public class GuardTask extends TaskEventEmitter { private final int N; public GuardTask(final TaskManager manager, int n) { super(manager.getExecutor()); this.N = n; on("end", new EventHandler() { private int i=0; @Override public void handle(EventObject event) { i++; if (i >= N){ manager.stop(); } } }); } @Override protected void run() throws Exception { }}
用户点评