Java并发的四种口味:Thread、Executor、ForkJoin、Actor,executorforkjoin
Java并发的四种口味:Thread、Executor、ForkJoin、Actor,executorforkjoin
这篇文章讨论了Java(Scala)应用中的并行处理的多种方法。从简单的自己管理Java线程,到各种提供的已实现的解决方案:Executor(Java)、ForkJoin(Java 7)、Actor(Akka)。
在这篇文章中,将对同一个任务,用不同的代码实现并发的解决方案;然后,讨论这些方案有哪些优缺点,并且告诉你将会产生什么样的陷阱。
我们将介绍下面几种并发处理和异步代码:
- 裸线程
- Executors & Services
- ForkJoin 并行框架 & 并行流
- Actor 模型
任务
任务:实现一个方法-接受一条消息和一组字符串作为参数;字符串与搜索引擎的查询页面对应。对于每个字符串,这个方法都发出一个HTTP请求来查询消息,并返回第一条可用结构。
方法一:“原滋原味”裸线程
线程是并发的最基本单元。Java线程本质上是操作系统的线程,每个线程对象对应一个计算机底层线程。
每个线程有自己的栈空间。
线程的接口相当简明:只需要提供一个Runnable对象,调用.start()。没有已准备好的API来结束线程-需要自己实现;通过类似Boolean类型的标记来通讯。
private static String getFirstResult(String question, List<String> engines) {
AtomicReference<String> result = new AtomicReference<>();
for(String base: engines) {
String url = base + question;
new Thread(() -> {
result.compareAndSet(null, WS.url(url).get());
}).start();
}
while(result.get() == null); // wait for some result to appear
return result.get();
}
上面的例子中,对每个被查询的搜索引擎创建一个线程。查询结果返回到AtomicReference;不需要锁或者其他机制来保证只出现一次write操作。
优点:很接近并发计算的操作系统/硬件模型;结构简单。多个线程并行,通过共享内存通讯。
缺点:需要关注线程的数量。线程是很昂贵的对象-需要大量的内存和时间。这是一个矛盾-线程太少,不能获得良好的并发性;线程太多,开销太大,调度复杂。
方法二:“认真对待”Executor & ExecutorService
使用Java提供的API:Executor接口。
Executor接口定义非常简单:
public interface Executor {
void execute(Runnable command);
Executor隐藏了处理Runnable的细节;我们仅需要提供一个Runnable,它会处理。
Executors 类提供了一组方法,能够创建拥有完善配置的线程池和executor。
我们将使用newFixedThreadPool(),它创建预定义数量的线程,并不允许线程数量超过这个预定义值。这意味着,如果所有的线程都被使用的话,提交的命令将会被放到一个队列中等待;当然这是由executor来管理的。在它的上层,有ExecutorService管理executor的生命周期,以及CompletionService会抽象掉更多细节,作为已完成任务的队列。得益于此,我们不必担心只会得到第一个结果。
private static String getFirstResultExecutors(String question, List<String> engines) {
ExecutorCompletionService<String> service = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(4));
for(String base: engines) {
String url = base + question;
service.submit(() -> {
return WS.url(url).get();
});
}
try {
return service.take().get();
}
catch(InterruptedException | ExecutionException e) {
return null;
}
}
适用场景:如果你需要精确的控制程序产生的线程数量,以及它们的精确行为,那么Executor和ExecutorService将是正确的选择。例如:线程池满时:增加线程池数量?不做数量限制?把任务放入队列?队列满时:无限队列?
线程和服务的生命周期也可以通过选项来配置,使资源可以在恰当的时间关闭。
Executor API都非常直观的给出了解决方法-配置各种实现好的类。
对于大型、复杂系统,个人认为使用Executor最合适。
方法 三:并行流,ForkJoinPool(FJP)
Java 8 中加入了并行流(Stream),从此我们有了一个并行处理集合的简单方法。它和lambda一起,构成了并发计算的一个强大工具。
如果你打算运用这种方法,那么有几点需要注意:首先,你必须掌握一些函数编程的概念,它实际上更有优势。其次,你很难知道并行流实际上是否使用了超过一个线程,这要由流的具体实现来决定。如果你无法控制流的数据源,你就无法确定它做了什么。
另外,你需要记住,默认情况下是通过ForkJoinPool.commonPool() 实现并行的。这个通用池由JVM来管理,并且被JVM进程内的所有线程共享。这简化了配置项,因此你不用担心。
private static String getFirstResult(String question, List<String> engines) {
// get element as soon as it is available
Optional<String> result = engines.stream().parallel().map((base) -> {
String url = base + question;
return WS.url(url).get();
}).findAny();
return result.get();
}
上面例子中,我们不关心单独的任务是在哪里完成的、由谁完成的。这也意味着,可能存在一些停滞的任务,我们却不知道。另一篇文章Java Parallel Streams Are Bad for Your Health 中,描述了 这个问题;并且有一个变通的解决方案,虽然并不直观。
ForkJoin是一个很好的并行框架-由一些聪明人编写和预先配置。
适用场景:当需要一个包含并行的小型程序时,是第一选择。(简单;已预先配置好。)
缺点:必须预先想到可能的并发问题;这很难,需要经验。
备注:Scala中,集合都用一个par() -等于Java 8 的parallel() 方法;返回一个并行计算的集合。Scala的建议是,对于可自由的结合的,是可被并行化的。如:reduceLeft()、foldLeft()不可;reduce()、fold()可并行化。
方法 四:雇佣 Actor
JDK中并没有Actor,这种方法其实是引入第三方类库,如:Akka-Scala中Actor的默认类库。
在Actor模型中,一切都是Actor。一个Actor是一个计算实体;它可以从其他Actor那里接受消息-接受消息;可以发送消息给其他Actor、或创建新Actor、或改变自己内部状态-应答消息。
这是一个非常强大的概念。生命周期和消息传递由你的框架来管理,你只需要指定计算单元是什么就可以了。另外,Actor模型强调避免全局状态,这会带来很多便利。你可以应用监督策略,例如免费重试,更简单的分布式系统设计,错误容忍度等等。
Akka对于Java和Scala都有很好的API和文档,是最流行的JVM Actor库。
我们开始一个Akka Actor的例子。
static class Message {
String url;
Message(String url) {this.url = url;}
}
static class Result {
String html;
Result(String html) {this.html = html;}
}
static class UrlFetcher extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message work = (Message) message;
String result = WS.url(work.url).get();
getSender().tell(new Result(result), getSelf());
} else {
unhandled(message);
}
}
}
static class Querier extends UntypedActor {
private String question;
private List<String> engines;
private AtomicReference<String> result;
public Querier(String question, List<String> engines, AtomicReference<String> result) {
this.question = question;
this.engines = engines;
this.result = result;
}
@Override public void onReceive(Object message) throws Exception {
if(message instanceof Result) {
result.compareAndSet(null, ((Result) message).html);
getContext().stop(self());
}
else {
for(String base: engines) {
String url = base + question;
ActorRef fetcher = this.getContext().actorOf(Props.create(UrlFetcher.class), "fetcher-"+base.hashCode());
Message m = new Message(url);
fetcher.tell(m, self());
}
}
}
}
private static String getFirstResultActors(String question, List<String> engines) {
ActorSystem system = ActorSystem.create("Search");
AtomicReference<String> result = new AtomicReference<>();
final ActorRef q = system.actorOf(
Props.create((UntypedActorFactory) () -> new Querier(question, engines, result)), "master");
q.tell(new Object(), ActorRef.noSender());
while(result.get() == null);
return result.get();
}
Akka actor在内部使用ForkJoin框架来处理工作。
缺点:Actor的设计哲学是避免全局状态;消息通讯。因此设计程序时要时刻小心,遵循Actor思想-这就很不灵活;项目迁移也很复杂,必须注意。
总结
四种Java并发编程方式:原始的Java线程、Executor、ForkJoin、Actor。
并发编程的几条心得:
不可变的一定是线程安全的;线程不安全的原因是状态可变。
并发编程不是指一个方法、一个类、一个框架;不能说使用java.concrrent包中的类写的代码就是线程安全的,不能说使用了上面四种方式编码就是线程安全的;并发编程是一种思想、一种处理并发的策略,不使用线程安全的类也可设计出线程安全的实现。
原文链接: Oleg Shelajev 翻译: ImportNew.com - shenggordon
译文链接: http://www.importnew.com/14506.html
相关文章
- 暂无相关文章
用户点评