Java线程池,java线程
Java线程池,java线程
前言
Java语言支持多线程编程。每个线程的创建和销毁都需要一定的性能开销。因此,实际使用多线程时多采用线程池。使用线程池有一些优点:可以重用线程,减少线程创建和销毁带来的性能开销;可以控制程序的并发数,避免系统资源耗尽;可以对线程进行管理,比如定时和定期执行等。
基础知识
Java 5新增了一个 java.util.concurrent 包,这个并发工具包提供了一个Executor框架,引入了线程池机制。
java.util.concurrent包中定义了三个执行器接口:
- Executor接口。一个支持执行新任务的简单接口。
- ExecutorService接口。一个继承自Executor的接口,增加了管理任务和执行器的生命周期的功能。
- ScheduledExecutorService接口。一个继承自ExecutorService的接口,增加了定时和定期执行任务的功能。
通常,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。
Executor接口
Executor接口的定义如下所示:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
Executor接口提供了一个execute()方法,该方法用来执行提交的Runnable任务。该接口提供了一种将任务的提交与每个任务如何执行相分离的方法。通常,使用Executor而再不是显式地创建线程。例如,如果runnable是一个Runnable对象,executor是一个Executor对象,那么你可以将下面的代码:
(new Thread(runnable)).start();
替换为:
executor.execute(runnable);
根据Executor接口的具体实现,execute()方法可以在当前线程中直接运行runnable,或者创建一个新线程来运行runnable,或者使用一个现有的线程来运行runnable,或者将runnable放在一个队列中等待一个线程可用。
ExecutorService接口
ExecutorService接口的定义如下所示:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService接口增加了一个与execute()方法类似,但更通用的submit()方法。与execute()方法一样,submit()方法可以接受一个Runnable对象,但也可以接受一个Callable对象,该对象允许任务返回一个值。submit()方法返回一个Future对象,该对象可以用来获取Callable任务的返回值,也可以用来管理Callable或者Runnable任务的状态。
ExecutorService接口也增加了管理执行器关闭的方法,增加了提交Callable对象的集合的方法。
ScheduledExecutorService接口
ScheduledExecutorService接口的定义如下所示:
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ScheduledExecutorService接口增加了schedule()方法,该方法可以在指定的延时之后执行一个Runnable或者Callable任务。
ScheduledExecutorService接口也增加了一个scheduleAtFixedRate()和一个scheduleWithFixedDelay()方法,这两个方法可以按照指定的间隔重复地执行Runnable任务。
线程池
ThreadPoolExecutor 类是线程池的核心实现类,我们可以通过创建该类来创建一个线程池。ThreadPoolExecutor类提供了4个构造方法,其中,参数相对较少的构造方法最终都是调用参数最多的那个构造方法。该构造方法如下所示:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
...
}
方法的各个参数说明如下所示:
- corePoolSize 核心线程数。当有新任务提交时,如果当前的线程数小于核心线程数,那么创建一个新的线程来执行该任务,即使其它的线程是空闲的。如果当前的线程数等于或者大于核心线程数,那么该任务将被添加到工作队列中。
- maximumPoolSize 最大线程数。当有新任务提交同时工作队列满了时,如果当前的线程数小于最大线程数,那么创建一个新的线程来执行该任务。如果当前的线程数已经等于最大线程数,那么该任务将被拒绝,同时将执行RejectedExecutionHandler。
- keepAliveTime 存活时间。如果当前的线程数大于核心线程数,同时非核心线程的空闲时间超过了keepAliveTime,那么这些线程会被终止。调用allowCoreThreadTimeOut(true)方法可以将存活时间也作用在核心线程上。默认情况下,核心线程一旦被创建将会一直存在,直到线程池被关闭。
- unit 存活时间的单位。可选的单位有NANOSECONDS(纳秒)、MICROSECONDS(微秒)、MILLISECONDS(毫秒)、SECONDS(秒)、MINUTES(分钟)、HOURS(小时)、DAYS(天)。
- workQueue 工作队列。任何的阻塞队列都可以被用来传输和存储提交的任务。
- threadFactory 线程工厂。线程工厂用来创建新的线程。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以使用自定义的线程工厂。
- handler 拒绝执行处理器。当线程池已经关闭或者线程池和队列都满了时,新提交的任务将被拒绝,同时将执行RejectedExecutionHandler。默认的RejectedExecutionHandler是ThreadPoolExecutor.AbortPolicy,当新提交的任务被拒绝时,它将抛出一个运行时异常RejectedExecutionException。
提交一个新任务到线程池,会经过如下过程:
示例代码如下所示:
public class MyThreadPool {
// 核心线程数
private static final int CORE_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors() - 1, 2);
// 最大线程数
private static final int MAXIMUM_POOL_SIZE = CORE_POOL_SIZE * 2;
// 存活时间
private static final int KEEP_ALIVE_SECONDS = 60;
// 工作队列
private static final BlockingQueue<Runnable> sWorkQueue = new LinkedBlockingQueue<>(100);
// 线程工厂
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MyThreadPool #" + mCount.getAndIncrement());
}
};
public static void main(String[] args) {
// 创建线程池
Executor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);
// 提交任务
System.out.println(new Date() + ", main(), the number of tasks is " + MAXIMUM_POOL_SIZE);
for (int i = 0; i < MAXIMUM_POOL_SIZE; i++) {
executor.execute(new MyRunnableTask());
}
}
private static class MyRunnableTask implements Runnable {
@Override
public void run() {
System.out.println(new Date() + ", run(), current thread is " + Thread.currentThread().getName());
// 模拟任务执行
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行程序,打印的信息如下所示:
Thu Dec 14 12:40:30 CST 2017, main(), the number of tasks is 4
Thu Dec 14 12:40:30 CST 2017, run(), current thread is MyThreadPool #1
Thu Dec 14 12:40:30 CST 2017, run(), current thread is MyThreadPool #2
Thu Dec 14 12:40:32 CST 2017, run(), current thread is MyThreadPool #1
Thu Dec 14 12:40:32 CST 2017, run(), current thread is MyThreadPool #2
可以看到,提交的任务数为4个,线程池的核心线程数为2个。前面两个任务提交时创建了两个核心线程来执行,后面两个任务提交时先被添加到工作队列中。当核心线程空闲时从工作队列中取出后面两个任务来执行。
常用的线程池
Executors工具类提供了4种常用的线程池:
- FixedThreadPool线程池。
- SingleThreadExecutor线程池。
- CachedThreadPool线程池。
- ScheduledThreadPool线程池。
这4种线程池也是通过配置ThreadPoolExecutor类生成的。
FixedThreadPool线程池
FixedThreadPool线程池的定义如下所示:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到,FixedThreadPool线程池的核心线程数等于最大线程数。因此,该线程池只有核心线程,没有非核心线程,存活时间无效。工作队列采用的是无界的阻塞队列LinkedBlockingQueue。当提交一个新任务到线程池时,如果线程数小于核心线程数,那么创建一个新的核心线程来执行该任务。如果线程数等于核心线程数,那么该任务将被添加到工作队列中。线程池中空闲的核心线程将会从工作队列中取出任务来执行。被创建的核心线程将会一直存在,直到线程池被关闭。
总结: FixedThreadPool是一个可以重用固定数量线程的线程池。
SingleThreadExecutor线程池
SingleThreadExecutor线程池的定义如下所示:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到,SingleThreadExecutor线程池与FixedThreadPool线程池很像,唯一的区别是它只有一个核心线程。当提交一个新任务到线程池时,如果还没有线程,那么创建一个新的核心线程来执行该任务。如果有一个核心线程,那么该任务将被添加到工作队列中。等该核心线程空闲了,它会从工作队列中取出任务来执行。因此,所有的任务在一个核心线程中以串行的方式顺序执行。
总结: SingleThreadExecutor是一个可以确保所有的任务在一个线程中以串行的方式顺序执行的线程池。
CachedThreadPool线程池
CachedThreadPool线程池的定义如下所示:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,CachedThreadPool线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE。因此,该线程池没有核心线程,只有无界的非核心线程。存活时间为60s。工作队列采用的是阻塞队列SynchronousQueue,该队列不存储任何元素,每个插入操作必须等待另一个线程相应的移除操作。当提交一个新任务到线程池时,将会重用以前创建的可用的线程。如果没有可用的线程,那么会创建一个新的线程来执行该任务。空闲60s的线程将被终止并从线程池中移除。
总结: CachedThreadPool是一个可以根据需要创建新线程,但同时可以重用以前创建的可用的线程的线程池。该线程池可以提高那些执行大量短期异步任务的程序的性能。
ScheduledThreadPool线程池
ScheduledThreadPool线程池的定义如下所示:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
可以看到,newScheduledThreadPool()方法返回的是ScheduledExecutorService接口的对象,它可以定时或者定期地执行任务。
ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。它的定义如下所示:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
0, NANOSECONDS,
new DelayedWorkQueue());
}
...
}
可以看到,ScheduledThreadPoolExecutor类的构造方法最终调用了ThreadPoolExecutor类的构造方法。ScheduledThreadPool线程池的核心线程数为固定值,最大线程数为Integer.MAX_VALUE。因此,该线程池有固定数量的核心线程,有无界的非核心线程。
总结: ScheduledThreadPool是一个可以调度任务在给定的延时之后执行,或者定期执行的线程池。
总结
每个线程的创建和销毁都需要一定的性能开销。因此,实际使用多线程时多采用线程池。线程池的创建可以使用Executors工具类提供的常用线程池,也可以使用ThreadPoolExecutor类来自定义线程池。
参考
- Java 8
- https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html
相关文章
- 暂无相关文章
用户点评