欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Java如何优雅关闭异步中的ExecutorService,

来源: javaer 分享于  点击 48528 次 点评:98

Java如何优雅关闭异步中的ExecutorService,


目录
  • 1.ExecutorService的核心价值
  • 2.关闭机制的必要性
  • 3.shutdown()方法详解
    • 3.1 方法特性
    • 3.2 内部运作机制
  • 4.shutdownNow()方法剖析
    • 4.1 方法定义
    • 4.2 中断处理要点
  • 5.对比分析
    • 6.最佳实践代码示例
      • 6.1 标准关闭模板
      • 6.2 带回调的增强实现
    • 7.关键注意事项
      • 8.高级应用场景
        • 9.性能优化建议
          • 10.总结建议

            1.ExecutorService的核心价值

            在并发编程领域,Java的ExecutorService(位于java.util.concurrent包)是线程池管理的关键接口。作为Executor框架的核心组件,它通过解耦任务提交与执行机制,为开发者提供了:

            • 线程生命周期管理自动化
            • 任务队列智能调度
            • 资源复用优化机制
            • 异步执行结果追踪能力

            2.关闭机制的必要性

            不正确的线程池关闭会导致:

            • 内存泄漏(滞留线程无法回收)
            • 应用无法正常终止(非守护线程保持活跃)
            • 任务状态不一致(突然中断导致数据问题)
            • 系统资源耗尽(无限制线程创建)

            3.shutdown()方法详解

            3.1 方法特性

            void shutdown()
            

            状态转换

            将线程池状态设置为SHUTDOWN,触发以下行为:

            • 拒绝新任务提交(触发RejectedExecutionHandler)
            • 继续执行已存在的任务:
              • 正在执行的任务(running tasks)
              • 等待队列中的任务(queued tasks)

            典型应用场景

            ExecutorService executor = Executors.newFixedThreadPool(4);
            // 提交多个任务...
            executor.shutdown();
            
            try {
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("仍有任务未在时限内完成");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            

            3.2 内部运作机制

            • 原子性状态更新:CAS操作修改线程池控制状态
            • 中断空闲线程:仅中断等待任务的Worker线程
            • 队列消费保证:完全处理BlockingQueue中的剩余任务

            4.shutdownNow()方法剖析

            4.1 方法定义

            List<Runnable> shutdownNow()
            

            状态转换

            将线程池状态设置为STOP,触发:

            • 立即拒绝新任务
            • 中断所有工作线程(无论是否在执行任务)
            • 清空任务队列,返回未执行任务列表

            4.2 中断处理要点

            executor.shutdownNow();
            // 典型返回值处理
            List<Runnable> unprocessed = executor.shutdownNow();
            if (!unprocessed.isEmpty()) {
                logger.warn("丢弃{}个未执行任务", unprocessed.size());
            }
            

            任务中断条件

            只有当任务代码正确处理中断时才能被终止:

            class InterruptibleTask implements Runnable {
                public void run() {
                    while (!Thread.currentThread().isInterrupted()) {
                        // 执行可中断的操作
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt(); // 重置中断状态
                            break;
                        }
                    }
                }
            }
            

            5.对比分析

            特性shutdown()shutdownNow()
            新任务接受立即拒绝立即拒绝
            运行中任务处理等待完成尝试中断
            队列任务处理全部执行清除并返回
            返回值void未执行任务列表
            适用场景优雅关闭紧急终止
            线程中断策略仅中断空闲线程强制中断所有线程

            6.最佳实践代码示例

            6.1 标准关闭模板

            public class GracefulShutdownExample {
                // 定义超时时间和时间单位(30秒)
                private static final int TIMEOUT = 30;
                private static final TimeUnit UNIT = TimeUnit.SECONDS;
            
                // 执行任务的方法,接收一个任务列表并将其提交给线程池执行
                public void executeTasks(List<Runnable> tasks) {
                    // 创建一个固定大小的线程池,大小为系统可用处理器核心数
                    ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
                    
                    try {
                        // 将任务列表中的每个任务提交到线程池
                        tasks.forEach(executor::submit);
                    } finally {
                        // 在所有任务提交完后,禁用线程池接收新任务,开始优雅关闭线程池
                        executor.shutdown(); // 禁止再提交新任务
                        try {
                            // 等待线程池中的任务在指定超时内完成,如果超时未完成,则强制关闭线程池
                            if (!executor.awaitTermination(TIMEOUT, UNIT)) {
                                // 如果未能在超时内完成,则调用 shutdownNow() 强制终止所有活动任务
                                List<Runnable> unfinished = executor.shutdownNow();
                                // 处理未完成的任务,例如记录日志或重新提交
                                handleUnfinishedTasks(unfinished);
                            }
                        } catch (InterruptedException e) {
                            // 如果在等待终止时被中断,恢复中断状态并强制关闭线程池
                            Thread.currentThread().interrupt();
                            executor.shutdownNow();
                        }
                    }
                }
                
                // 处理未完成任务的方法,这里我们打印未完成任务的数量
                private void handleUnfinishedTasks(List<Runnable> tasks) {
                    // 如果有未完成的任务,打印任务数量并执行额外的处理
                    if (!tasks.isEmpty()) {
                        System.out.println("未完成任务数: " + tasks.size());
                        // 可在此处记录日志、重新排队未完成的任务等
                    }
                }
            
            }

            构造线程池: Executors.newFixedThreadPool() 创建一个固定大小的线程池,大小为系统可用的处理器核心数,这样可以更高效地利用 CPU 资源。

            提交任务: 使用 tasks.forEach(executor::submit) 提交每个任务到线程池中执行。

            优雅关闭线程池:

            • executor.shutdown() 禁用线程池接收新任务,但仍会执行已经提交的任务。
            • awaitTermination() 方法用于等待所有任务执行完成。如果超时后任务未完成,则调用 shutdownNow() 强制关闭线程池,停止所有正在运行的任务,并返回未完成的任务。

            处理中断: 如果在等待终止过程中发生 InterruptedException,线程会恢复中断状态,并且强制关闭线程池。

            处理未完成任务: handleUnfinishedTasks() 方法会处理未完成的任务,比如记录日志或者重新排队未完成的任务。

            6.2 带回调的增强实现

            public class EnhancedExecutorManager {
                // 定义线程池对象
                private final ExecutorService executor;
                // 定义超时时间及单位
                private final long timeout;
                private final TimeUnit unit;
            
                // 构造函数,初始化线程池并设置超时时间和单位
                public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) {
                    // 创建一个核心池大小为 corePoolSize,最大池大小为 corePoolSize * 2,最大空闲时间 60秒的线程池
                    this.executor = new ThreadPoolExecutor(
                        corePoolSize,                             // 核心线程池大小
                        corePoolSize * 2,                         // 最大线程池大小
                        60L, TimeUnit.SECONDS,                    // 空闲线程的存活时间
                        new LinkedBlockingQueue<>(1000),          // 使用容量为 1000 的队列来缓存任务
                        new CustomThreadFactory(),                // 自定义线程工厂
                        new ThreadPoolExecutor.CallerRunsPolicy() // 当任务无法提交时,调用者线程执行该任务
                    );
                    this.timeout = timeout;                     // 设置超时时间
                    this.unit = unit;                           // 设置超时时间单位
                }
                
                // 优雅关闭线程池的方法
                public void shutdown() {
                    executor.shutdown(); // 首先尝试正常关闭线程池,不再接收新的任务
                    
                    try {
                        // 如果线程池未能在指定的超时时间内终止,则强制关闭
                        if (!executor.awaitTermination(timeout, unit)) {
                            System.out.println("强制终止线程池...");
                            // 强制停止所有正在执行的任务并返回丢弃的任务列表
                            List<Runnable> droppedTasks = executor.shutdownNow();
                            System.out.println("丢弃任务数: " + droppedTasks.size());
                        }
                    } catch (InterruptedException e) {
                        // 如果在等待过程中线程池关闭操作被中断,立即强制关闭并恢复中断状态
                        executor.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
                
                // 自定义线程工厂类,用于创建线程
                private static class CustomThreadFactory implements ThreadFactory {
                    private static final AtomicInteger poolNumber = new AtomicInteger(1); // 线程池编号,用于生成线程名
                    private final ThreadGroup group; // 线程组
                    private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程编号
                    private final String namePrefix; // 线程名称前缀
                
                    CustomThreadFactory() {
                        // 获取当前系统的安全管理器,如果没有,则使用当前线程的线程组
                        SecurityManager s = System.getSecurityManager();
                        group = (s != null) ? s.getThreadGroup() :
                                              Thread.currentThread().getThreadGroup();
                        // 设置线程池的名称前缀
                        namePrefix = "pool-" +
                                      poolNumber.getAndIncrement() + // 线程池编号递增
                                     "-thread-";
                    }
                
                    // 创建新线程的方法
                    public Thread newThread(Runnable r) {
                        // 创建新的线程,线程组、名称及优先级均已设置
                        Thread t = new Thread(group, r,
                                              namePrefix + threadNumber.getAndIncrement(),
                                              0); // 默认优先级和daemon设置
                        // 如果线程是守护线程,则将其设置为非守护线程
                        if (t.isDaemon())
                            t.setDaemon(false);
                        // 设置线程优先级为默认
                        if (t.getPriority() != Thread.NORM_PRIORITY)
                            t.setPriority(Thread.NORM_PRIORITY);
                        return t; // 返回新创建的线程
                    }
                }
            }

            线程池初始化:

            • EnhancedExecutorManager 的构造方法使用 ThreadPoolExecutor 创建一个线程池,线程池大小通过 corePoolSize 参数传递。线程池的最大线程数是核心线程数的两倍。
            • LinkedBlockingQueue 用作任务队列,大小为 1000。若任务量超过队列容量,则使用 CallerRunsPolicy 策略,即由提交任务的线程执行该任务。
            • 使用自定义的 CustomThreadFactory 来创建线程。

            优雅关闭线程池:

            • shutdown() 方法首先调用 executor.shutdown() 来拒绝接受新的任务,然后等待线程池在指定的超时时间内关闭。
            • 如果线程池在超时时间内未能正常关闭,则调用 shutdownNow() 强制关闭并丢弃未执行的任务,同时输出丢弃任务的数量。
            • 如果在等待关闭过程中发生 InterruptedException,会强制关闭线程池,并恢复中断状态。

            自定义线程工厂:

            • CustomThreadFactory 通过实现 ThreadFactory 接口来定义创建线程的行为,主要包括线程组、线程名称、守护线程状态和线程优先级的配置。
            • 每个线程的名称遵循 pool-编号-thread-编号 的格式。线程池的编号是递增的,每个线程有自己的编号。

            7.关键注意事项

            • 守护线程问题:默认创建的是非守护线程,需显式关闭
            • 中断策略一致性:任务必须实现正确的中断处理逻辑
            • 拒绝策略配合:合理配置RejectedExecutionHandler
            • 资源释放顺序:数据库连接等资源应先于线程池关闭
            • 监控机制:建议集成线程池监控(如JMX)

            8.高级应用场景

            分级关闭策略

            public class TieredShutdownManager { 
                // 定义三个优先级的线程池列表:高优先级、中优先级、低优先级
                private final List<ExecutorService> highPriority; 
                private final List<ExecutorService> normalPriority; 
                private final List<ExecutorService> lowPriority; 
              
                // 公共方法用于优雅关闭所有线程池
                public void gracefulShutdown() { 
                    // 依次关闭高、中、低优先级的线程池
                    shutdownTier(highPriority, 10, TimeUnit.SECONDS); 
                    shutdownTier(normalPriority, 30, TimeUnit.SECONDS); 
                    shutdownTier(lowPriority, 60, TimeUnit.SECONDS); 
                } 
              
                // 私有方法,用于优雅关闭指定优先级的线程池
                private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) { 
                    // 对指定的线程池列表执行关闭操作
                    tier.forEach(ExecutorService::shutdown); 
              
                    // 对每个线程池执行等待终止的操作,指定超时时间
                    tier.forEach(executor -> { 
                        try { 
                            // 如果线程池未在超时时间内终止,则调用 shutdownNow 强制关闭
                            if (!executor.awaitTermination(timeout, unit)) { 
                                executor.shutdownNow(); 
                            } 
                        } catch (InterruptedException e) { 
                            // 如果在等待终止过程中线程被中断,恢复中断状态并强制关闭线程池
                            Thread.currentThread().interrupt(); 
                            executor.shutdownNow(); 
                        } 
                    }); 
                } 
            }

            gracefulShutdown 方法按照优先级顺序依次关闭高、中、低优先级的线程池。

            shutdownTier 方法首先尝试正常关闭每个线程池(调用 shutdown),然后通过 awaitTermination 方法等待线程池在指定的时间内结束,如果未成功结束,则调用 shutdownNow 强制关闭。

            在关闭过程中,如果发生中断,则会捕获 InterruptedException 异常,并且中断当前线程,同时强制关闭线程池。

            9.性能优化建议

            根据任务类型选择队列策略:

            • CPU密集型:有界队列(ArrayBlockingQueue)
            • IO密集型:无界队列(LinkedBlockingQueue)

            监控关键指标:

            ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
            System.out.println("活跃线程数: " + executor.getActiveCount());
            System.out.println("完成任务数: " + executor.getCompletedTaskCount());
            System.out.println("队列大小: " + executor.getQueue().size());
            

            动态调整参数:

            executor.setCorePoolSize(newSize);
            executor.setMaximumPoolSize(newMaxSize);
            

            10.总结建议

            根据Oracle官方文档建议,在大多数生产场景中推荐以下关闭流程:

            • 优先调用shutdown()
            • 设置合理的awaitTermination超时
            • 必要时调用shutdownNow()
            • 始终处理返回的未完成任务
            • 记录完整的关闭日志

            正确选择关闭策略需要综合考量:

            • 任务重要性等级
            • 系统资源限制
            • 业务连续性需求
            • 数据一致性要求

            到此这篇关于Java如何优雅关闭异步中的ExecutorService的文章就介绍到这了,更多相关Java关闭ExecutorService内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!

            您可能感兴趣的文章:
            • Java中ScheduledExecutorService介绍和使用案例(推荐)
            • java线程池ExecutorService超时处理小结
            • java开发ExecutorService监控实现示例详解
            • Java ScheduledExecutorService的具体使用
            • 一文搞懂Java ScheduledExecutorService的使用
            • java多线程开发ScheduledExecutorService简化方式
            相关栏目:

            用户点评