Java 线程池,
Java 线程池,
线程池使用:
ExecutorService pool = Executors.newFixedThreadPool(3);
1. newCachedThreadPool
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// SynchronousQueue,每个插入操作必须等待另一个线程的对应移除操作
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// LinkedBlockingQueue是无界的
new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor参数
corePoolSize
核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。
核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
maxPoolSize
当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
keepAliveTime
当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
allowCoreThreadTimeout
是否允许核心线程空闲退出,默认值为false。
queueCapacity
任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorDemo {
public static void main(String[] args) {
ExecutorService executors = Executors.newCachedThreadPool();
executors.execute(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});
}
}
线程池添加线程:
当线程数小于核心线程数时,创建线程。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务
Executor
public interface Executor {
void execute(Runnable command);
}
ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Executors
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SynchronousQueue
/**
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
* 不公平锁
*/
public SynchronousQueue() {
this(false);
}
ThreadPoolExecutor
ctl:是一个原子整数包装两个概念字段workerCount、指示的有效数量的线程runState,指示是否运行,关闭等。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程的个数小于核心池的个数
if (workerCountOf(c) < corePoolSize) {
// 1...
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果进入工作队列成功:BlockingQueue<Runnable> workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 可增加worker
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...//check
// CAS失败重试
for (;;) {
// 获取当前工作线程数目
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 1...如果当前工作线程数目大于core线程数目,返回失败
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 1...否则当前工作线程自增,表示又创建了新的工作线程
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 工作线程实例化
w = new Worker(firstTask);
final Thread t = w.thread;
// 如果firstTask==null,直接工作线程数自减
if (t != null) {
// 上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加入到工作线程集合里面
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 标志成功添加
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 成功添加后开始线程调度
t.start(); // worker 以线程形式启动
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 添加失败操作
addWorkerFailed(w);
}
return workerStarted;
}
// 增加worker=>thread
private boolean addWorker(Runnable firstTask, boolean core) {
...
// 前置的校验
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
// worker 对应一个thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// worker 运行,thread调用worker的start,然后调用到这
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调度,获取一个runnable对象,执行调度它
while (task != null || (task = getTask()) != null) {
w.lock();
...
try {
// 调度前
beforeExecute(wt, task);
...
try {
task.run();
}
...// catch
} finally {
// 执行后
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 负责调度线程执行的实体类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
....
public void run() {
runWorker(this);
}
....
}
获取runnable:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
... // check
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 注意,线程池由这个队列实现
// private final BlockingQueue<Runnable> workQueue;
// newFixedThreadPool => LinkedBlockingQueue
// newCachedThreadPool => SynchronousQueue<Runnable> //同步阻塞队列
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
execute执行时:
1.调用workCountOf()方法获取当前线程池中线程数量,判断当前线程数量是否超过了核心线程数,若未超过,则直接添加一个核心线程,并用此线程完成当前提交的任务。
2.若当前线程数已经超过核心线程数且ctl状态等于RUNNING且工作成功进入工作等待队列,则我们进一步复查ctl,若ctl状态依旧为RUNNING,且调用workCountOf()方法检查发现此时的工作线程数为0时,将添加一个工作线程;若此时已经不为RUNNING,则尝试移除任务,并调用拒绝任务方法:reject(command)。(这里之所以需要复查ctl状态是由于在执行workQueue.offer(command)方法时,ctl状态随时可能由于调用shutDown方法或者shutDownNow方法而发生变化)。
3.如果上述两种情况都不吻合,即此时已经有超过核心线程数的的线程在工作,且任务队列也已堆满,则尝试增加一个工作线程(如果此时线程数达到限定最大线程数,则会失败),若失败则调用拒绝任务方法reject(command).
在execute的方法中添加工作线程的所调用的法为addWorker(Runnable runnable,Boolean core).该方法接受两个参数,runnable对象为这个新建线程的第一个工作任务,可以为空。core指代新建的任务是否是核心线程。
参考
1 当前所说的计算机 “有几个CPU”准确的说法是“CPU有几个核心”,任务管理器里面看到的是CPU的线程。即CPU物理上的个数为一个,但是有多个核心。
2 双核处理器即是基于单个半导体的一个处理器上拥有两个一样功能的处理器核心。换句话说,将两个物理处理器核心整合入一个核中。
线程池涉及到的设计模式
线程池模式:针对于频繁open和close的资源来说,为了减小这两个操作的开销,可以使用池对资源进行管理和复用
例如:
1. 线程池
2. 连接池
工厂模式:newCachedThreadPool()等工厂方法
线程池的调优
工作队列排队策略:
排队策略
排队有三种通用策略:
1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
线程池里对线程状态进行监控
不存在管理类或者后台线程对线程状态进行扫描
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
public void run() {
runWorker(this);
}
// run方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task不为空
while (task != null || (task = getTask()) != null) {
w.lock();
...
try {
beforeExecute(wt, task);
...
task.run();
...
afterExecute(task, thrown);
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//线程尝试退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试中断
tryTerminate();
...
}
线程池关闭
设置线程池大小
扩展线程池监控功能,如计时和日志
相关文章
- 暂无相关文章
用户点评