欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Java—线程池ThreadPoolExecutor案例详解,高薪必备,

来源: javaer 分享于  点击 37871 次 点评:82

Java—线程池ThreadPoolExecutor案例详解,高薪必备,


引导

要求:线程资源必须通过线程池提供,不允许在应用自行显式创建线程; 说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗内存或者“过度切换”的问题。

特别要注意:光理论是不够的,记住:Java架构项目经验永远是核心,如果你没有最新JAVA架构实战教程及大厂30k+面试宝典,可以去小编的Java架构学习.裙 :七吧伞吧零而衣零伞 (数字的谐音)转换下可以找到了,里面很多新JAVA架构项目教程,还可以跟老司机交流讨教! 

线程池介绍

线程池概述

  线程池,顾名思义是一个放着线程的池子,这个池子的线程主要是用来执行任务的。当用户提交任务时,线程池会创建线程去执行任务,若任务超过了核心线程数的时候,会在一个任务队列里进行排队等待,这个详细流程,我们会后面细讲。   任务,通常是一些抽象的且离散的工作单元,我们会把应用程序的工作分解到多个任务中去执行。一般我们需要使用多线程执行任务的时候,这些任务最好都是相互独立的,这样有一定的任务边界供程序把控。   多线程,当使用多线程的时候,任务处理过程就可以从主线程中剥离出来,任务可以并行处理,同时处理多个请求。当然了,任务处理代码必须是线程安全的。

为何要使用线程池?

线程池的优势

线程池原理

线程池的参数类型

一共有7个:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5个重要)

int corePoolSize:该线程池中核心线程数最大值

这边我们区分两个概念:

  • 核心线程:线程池新建线程的时候,当前线程总数< corePoolSize,新建的线程即为核心线程。
  • 非核心线程:线程池新建线程的时候,当前线程总数< corePoolSize,新建的线程即为核心线程。

核心线程默认情况下会一直存活在线程池中,即使这个核心线程不工作(空闲状态),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut这个属性为 true,那么核心线程如果空闲状态下,超过一定时间后就被销毁。

int maximumPoolSize:线程总数最大值

线程总数 = 核心线程数 + 非核心线程数

long keepAliveTime:非核心线程空闲超时时间

  keepAliveTime即为空闲线程允许的最大的存活时间。如果一个非核心线程空闲状态的时长超过keepAliveTime了,就会被销毁掉。注意:如果设置allowCoreThreadTimeOut = true,就变成核心线程超时销毁了。

TimeUnit unit:是keepAliveTime 的单位

TimeUnit 是一个枚举类型,列举如下:

单位说明
NANOSECONDS 1微毫秒 = 1微秒 / 1000
MICROSECONDS 1微秒 = 1毫秒 / 1000
MILLISECONDS 1毫秒 = 1秒 /1000
SECONDS
MINUTES
HOURS 小时
DAYS

BlockingQueue workQueue:存放任务的阻塞队列

  当核心线程都在工作的时候,新提交的任务就会被添加到这个工作阻塞队列中进行排队等待;如果阻塞队列也满了,线程池就新建非核心线程去执行任务。workQueue维护的是等待执行的Runnable对象。 常用的 workQueue 类型:(无界队列、有界队列、同步移交队列)

注意: 只有当任务相互独立没有任何依赖的时候,线程池或工作队列设置有界是合理的;若任务之间存在依赖性,需要使用无界的线程池,如newCachedThreadPool,否则有可能会导致死锁问题。

ThreadFactory threadFactory

  创建线程的方式,这是一个接口,你 new 他的时候需要实现他的 Thread newThread(Runnable r) 方法,一般用不上,

RejectedExecutionHandler handler:饱和策略

抛出异常专用,当队列和最大线程池都满了之后的饱和策略。

线程池工作流程

一般流程即为:创建worker线程;添加任务入workQueue队列;worker线程执行任务。

" alt="在这里插入图片描述" data-src="https://user-gold-cdn.xitu.io/2020/5/3/171da139c2155b39?imageView2/0/w/1280/h/960/format/webp/ignore-error/1" data-width="1280" data-height="554">当一个任务被添加进线程池时:

 

这边,为了大家能够更好的去理解这块的流程,我们举一个例子。生活中我们经常会去打一些公司的咨询电话或者是一些特定机构的投诉电话,而那个公司或者机构的客服中心就是一个线程池,正式员工的客服小姐姐就好比是核心线程,比如有6个客服小姐姐。 5. 当用户的电话打进到公司的客服中心的时候(提交任务); 6. 客服中心会调度客服小姐姐去接听电话(创建线程执行任务),如果接听的电话超过了6个,6个客服小姐姐都在接听的工作状态了(核心线程池满了),这时客服中心会有一个电话接听等待通道(进入任务队列等待),就是我们经常听到的“您的通话在排队,前面排队n人。” 7. 当然,这个电话接听等待通道也是有上限的,当超过这个上限的时候(任务队列满了),客服中心就会立即安排外协员工(非核心线程),也就是非正式员工去接听额外的电话(任务队列满了,正式和非正式员工数量>总任务数,线程池创建非核心线程去执行任务)。 8. 当用户电话数激增,客服中心控制台发现这个时候正式员工和外协员工的总和已经满足不了这些用户电话接入了(总线程池满),就开始根据一些公司电话接听规则去拒绝这些电话(按照拒绝策略处理无法执行的任务)

线程池状态

 

" alt="在这里插入图片描述" data-src="https://user-gold-cdn.xitu.io/2020/5/3/171da139c220581a?imageView2/0/w/1280/h/960/format/webp/ignore-error/1" data-width="1264" data-height="434">

 

  • RUNNING:运行状态,指可以接受任务并执行队列里的任务。
  • SHUTDOWN:调用了 shutdown() 方法,不再接受新任务,但队列里的任务会执行完毕。
  • STOP:指调用了 shutdownNow() 方法,不再接受新任务,所有任务都变成STOP状态,不管是否正在执行。该操作会抛弃阻塞队列里的所有任务并中断所有正在执行任务。
  • TIDYING:所有任务都执行完毕,程序调用 shutdown()/shutdownNow() 方法都会将线程更新为此状态,若调用shutdown(),则等执行任务全部结束,队列即为空,变成TIDYING状态;调用shutdownNow()方法后,队列任务清空且正在执行的任务中断后,更新为TIDYING状态。
  • TERMINATED:终止状态,当线程执行 terminated() 后会更新为这个状态。

线程池源码

线程池核心接口

ThreadPoolExecutor,在java.util.concurrent下。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize, //核心线程数
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime, //空闲线程存活时间
                              TimeUnit unit, //存活时间单位
                              BlockingQueue<Runnable> workQueue, //任务的阻塞队列
                              ThreadFactory threadFactory, //新线程的产生方式
                              RejectedExecutionHandler handler //拒绝策略) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

ThreadPoolExecutor 继承 AbstractExecutorService;AbstractExecutorService 实现 ExecutorService, ExecutorService 继承 Executor

" alt="在这里插入图片描述" data-src="https://user-gold-cdn.xitu.io/2020/5/3/171da139c3d42655?imageView2/0/w/1280/h/960/format/webp/ignore-error/1" data-width="310" data-height="469">

 

public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
复制代码

线程池构造方法

1)5参数构造器

// 5参数构造器
public ThreadPoolExecutor(int corePoolSize,
		             	int maximumPoolSize,
			long keepAliveTime,
			TimeUnit unit,
			BlockingQueue<Runnable> workQueue)

复制代码

2)6参数构造器-1

// 6参数构造器-1
public ThreadPoolExecutor(int corePoolSize,
		             	int maximumPoolSize,
			long keepAliveTime,
			TimeUnit unit,
			BlockingQueue<Runnable> workQueue,
			ThreadFactory threadFactory)

复制代码

3)6参数构造器-2

// 6参数构造器-2
public ThreadPoolExecutor(int corePoolSize,
		             	int maximumPoolSize,
			long keepAliveTime,
			TimeUnit unit,
			BlockingQueue<Runnable> workQueue,
			RejectedExecutionHandler handler)

复制代码

4)7参数构造器

// 7参数构造器
public ThreadPoolExecutor(int corePoolSize,
		             	int maximumPoolSize,
			long keepAliveTime,
			TimeUnit unit,
			BlockingQueue<Runnable> workQueue,
			ThreadFactory threadFactory,
			RejectedExecutionHandler handler)
复制代码

四种线程池

常规用法

//创建固定数目线程的线程池
Executors.newFixedThreadPool(200);

//创建一个无限线程的线程池,无需等待队列,任务提交即执行
Executors.newCachedThreadPool()

//创建有且仅有一个线程的线程池
Executors.newSingleThreadExecutor();

复制代码

newCachedThreadPool():可缓存线程池

介绍

newCachedThreadPool将创建一个可缓存的线程,如果当前线程数超过处理任务时,回收空闲线程;当需求增加时,可以添加新线程去处理任务。

创建方法

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

源码
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码

newFixedThreadPool():定长线程池

介绍

newFixedThreadPool创建一个固定长度的线程池,每次提交一个任务的时候就会创建一个新的线程,直到达到线程池的最大数量限制。

创建方法

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);

源码
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
复制代码

newScheduledThreadPool():定时线程池

介绍

newScheduledThreadPool创建一个固定长度的线程池,并且以延迟或者定时的方式去执行任务。

创建方法:

ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

源码
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
复制代码

newSingleThreadExecutor():单线程化的线程池

介绍

newSingleThreadExecutor顾名思义,是一个单线程的Executor,只创建一个工作线程执行任务,若这个唯一的线程异常故障了,会新建另一个线程来替代,newSingleThreadExecutor可以保证任务依照在工作队列的排队顺序来串行执行。

创建方法

ExecutorService singleThreadPool = Executors.newSingleThreadPool();

源码
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }
复制代码

execute()方法

介绍

ThreadPoolExecutor.execute(Runnable command)方法,即可向线程池内添加一个任务

execute源码
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
	//获取当前线程池的状态
        int c = ctl.get();
	//若当前线程数量小于corePoolSize,则创建一个新的线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
	//判断当前线程是否处于运行状态,且写入任务阻塞队列是否成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
	//再次获取线程状态进行双重检查;如果线程变成非运行状态,则从阻塞队列移除任务;
            if (! isRunning(recheck) && remove(command))
	//执行拒绝策略
                reject(command);
	//若当前线程池为空,则新建一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
	//当前线程为非运行状态并且尝试新建线程,若失败则执行拒绝策略。
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码
流程分析

1)若当前线程数小于corePoolSize,则调用addWorker()方法创建线程执行任务。 2)若当前线程不小于corePoolSize,则将任务添加到workQueue队列,等待空闲线程来执行。 3)若队列里的任务数到达上限,且当前运行线程小于maximumPoolSize,任务入workQueue队列失败,新建线程执行任务; 4)若创建线程也失败(队列任务数到达上限,且当前线程数达到了maximumPoolSize),对于新加入的任务,就会调用reject()(内部调用handler)拒绝接受任务。

Q&A

两种关闭线程池的区别

  • shutdown(): 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow(): 执行后停止接受新任务,但会中断所有的任务(不管是否正在执行中),将线程池状态变为 STOP状态。

拒绝策略有哪些?

ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。 拒绝策略如下:

  • CallerRunsPolicy : 调用线程处理任务
  • AbortPolicy : 抛出异常
  • DiscardPolicy : 直接丢弃
  • DiscardOldestPolicy : 丢弃队列中最老的任务,执行新任务

RejectedExecutionHandler rejected = null;

//默认策略,阻塞队列满,则丢任务、抛出异常
rejected = new ThreadPoolExecutor.AbortPolicy();

//阻塞队列满,则丢任务,不抛异常
rejected = new ThreadPoolExecutor.DiscardPolicy();

//删除队列中最旧的任务(最早进入队列的任务),尝试重新提交新的任务
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();

//队列满,不丢任务,不抛异常,若添加到线程池失败,那么主线程会自己去执行该任务
rejected = new ThreadPoolExecutor.CallerRunsPolicy();

复制代码

(1)AbortPolicy、DiscardPolicy和DiscardOldestPolicy   AbortPolicy默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。   当新提交的任务无法保存到队列中等待执行时,DiscardPolicy会悄悄的抛弃该任务。   DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。 (2)CallerRunsPolicy   CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了execute的线程中执行该任务。在线程满后,新任务将交由调用线程池execute方法的主线程执行,而由于主线程在忙碌,所以不会执行accept方法,从而实现了一种平缓的性能降低。   当工作队列被填满后,没有预定义的饱和策略来阻塞execute(除了抛弃就是中止还有去让调用者去执行)。然而可以通过Semaphore来限制任务的到达率。

最后要注意:光理论是不够的,记住:Java架构项目经验永远是核心,如果你没有最新JAVA架构实战教程及大厂30k+面试宝典,可以去小编的Java架构学习.裙 :七吧伞吧零而衣零伞 (数字的谐音)转换下可以找到了,里面很多新JAVA架构项目教程,还可以跟老司机交流讨教!
本文的文字及图片来源于网络加上自己的想法,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理

相关文章

    暂无相关文章
相关栏目:

用户点评