欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Java线程池的理论与实践,java线程理论与实践

来源: javaer 分享于  点击 48501 次 点评:38

Java线程池的理论与实践,java线程理论与实践


前段时间公司里有个项目需要进行重构,目标是提高吞吐量和可用性,在这个过程中对原有的线程模型和处理逻辑进行了修改,发现有很多基础的多线程的知识已经模糊不清,如底层线程的运行情况、现有的线程池的策略和逻辑、池中线程的健康状况的监控等,这次重新回顾了一下,其中涉及大量java.util.concurrent包中的类。本文将会包含以下内容:

一、问题描述

这个项目所在系统的软件架构(从开发到运维)基本上采用的是微服务架构,微服务很好地解决了我们系统的复杂性问题,但是随之也带来了一些问题,比如在此架构中大部分的服务都拥有自己单独的数据库,而有些(很重要的)业务需要做跨库查询。相信这种「跨库查询」的问题很多实践微服务的公司都碰到过,通常这类问题有以下几种解决方案(当然,还有更多其他的方案,这里就不一一叙述了):

指令与查询分离

结合实际情况,我们使用的是第3种方案。然而随着越来越多的业务依赖读库,甚至依赖其中一些状态的变化,所以读库的数据同步如果出现高延时,则会直接影响业务的进行。出了几次这种事情后,于是下决心要改善这种情况。首先想到的就是使用线程池来进行消息的消费(写入读库),JDK自从1.5开始提供了实用而强大的线程池工具——Executor框架。

二、Executor框架

Executor框架在Java1.5中引入,大部分的类都在包java.util.concurrent中,由大神Doug Lea写成,其中常用到的有以下几个类和接口:

三、为什么要用线程池

Java从最开始就是基于线程的,线程在Java里被封装成一个类java.lang.Thread。在面试中很多面试官都会问一个很基础的关于线程问题:

Java中有几种方法新建一个线程?

所有人都知道,标准答案是两种:继承Thread或者实现Runnable,在JDK源代码中Thread类的注释中也是这么写的。

然而在我看来这两种方法根本就是一种,所有想要开启线程的操作,都必须生成了一个Thread类(或其子类)的实例,执行其中的native方法start0()

Java中的线程

Java中将线程抽象为一个普通的类,这样带来了很多好处,譬如可以很简单的使用面向对象的方法实现多线程的编程,然而这种程序写多了容易会忘记,这个对象在底层是实实在在地对应了一个OS中的线程。

操作系统中的线程和进程

上图中的进程(Process)可以看做一个JVM,可以看出,所有的进程有自己的私有内存,这块内存会在主存中有一段映射,而所有的线程共享JVM中的内存。在现代的操作系统中,线程的调度通常都是集成在操作系统中的,操作系统能通过分析更多的信息来决定如何更高效地进行线程的调度,这也是为什么Java中会一直强调,线程的执行顺序是不会得到保证的,因为JVM自己管不了这个,所以只能认为它是完全无序的。

另外,类java.lang.Thread中的很多属性也会直接映射为操作系统中线程的一些属性。Java的Thread中提供的一些方法如sleep和yield其实依赖于操作系统中线程的调度算法。

关于线程的调度算法可以去读操作系统相关的书籍,这里就不做太多叙述了。

线程的开销

通常来说,操作系统中线程之间的上下文切换大约要消耗1到10微秒

从上图中可以看出线程中包含了一些上下文信息:

  • CPU栈指针(Stack)、
  • 一组寄存器的值(Registers),
  • 指令计数器的值(PC)等,

它们都保存在此线程所在的进程所映射的主存中,而对于Java来说,这个进程就是JVM所在的那个进程,JVM的运行时内存可以简单的分为如下几部分:

其中#1中的栈可以认为是这个线程的上下文,创建线程要申请相应的栈空间,而栈空间的大小是一定的,所以当栈空间不够用时,会导致线程申请不成功。在Thread的源代码中可以看到,启动线程的最后一步是执行一个本地方法private native void start0(),代码1是OpenJDK中start0最终调用的方法:

//代码1
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;
  bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event
  // in Thread::start.
  {
    MutexLocker mu(Threads_lock);

    //省略一些代码

      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);
  }

  if (native_thread->osthread() == NULL) {
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }

  Thread::start(native_thread);

JVM_END

从代码1中可以看到,线程的创建首先需要栈空间,所以过多的线程创建可能会导致OOM。

同时,线程的切换会有以下开销:

根据以上的描述,所以通常建议尽可能创建较少的线程,减少锁的使用(尤其是synchronized),尽量使用JDK提供的同步工具。而为了减少线程上下文切换带来的开销,通常使用线程池是一个有效的方法。

Java中的线程池

Executor框架中最常用的大概就是java.util.concurrent.ThreadPoolExecutor了,对于它的描述,简单的说就是「它维护了一个线程池,对于提交到此Executor中的任务,它不是创建新的线程而是使用池内的线程进行执行」。对于「数量巨大但执行时间很小」的任务,可以显著地减少对于任务执行的开销。java.util.concurrent.ThreadPoolExecutor中包含了很多属性,通过这些属性开发者可以定制不同的线程池行为,大致如下:

1. 线程池的大小:corePoolSizemaximumPoolSize

ThreadPoolExecutor中线程池的大小由这两个属性决定,前者指当线程池正常运行起来后的最小(核心)线程数,当一个任务到来时,若当前池中线程数小于corePoolSize,则会生成新的线程;后者指当等待队列满了之后可生成的最大的线程数。在例1中返回的对象中这两个值相等,均等于用户传入的值。

2. 用户可以通过调用java.util.concurrent.ThreadPoolExecutor上的实例方法来启动核心线程(core pool)

3. 可定制化的线程生成方式:threadFactory

默认线程由方法Executors.defaultThreadFactory()返回的ThreadFactory进行创建,默认创建的线程都不是daemon,开发者可以传入自定义的ThreadFactory进行对线程的定制化。

5. 非核心线程的空闲等待时间:keepAliveTime

6. 任务等待队列:workQueue

这个队列是java.util.concurrent.BlockingQueue<E>的一个实例。当池中当前没有空闲的线程来执行任务,就会将此任务放入等待队列,根据其具体实现类的不同,又可分为3种不同的队列策略:

7. 任务拒绝处理器:defaultHandler

如果任务被拒绝执行,则会调用这个对象上的RejectedExecutionHandler.rejectedExecution()方法,JDK定义了4种处理策略,用户可以自定义自己的任务处理策略。

8. 允许核心线程过期:allowCoreThreadTimeOut

上面说的所有情况都是基于这个变量为false(默认值)来说的,如果你的线程池已经不使用了(不被引用),但是其中还有活着的线程时,这个线程池是不会被回收的,这种情况就造成了内存泄漏——一块永远不会被访问到的内存却无法被GC回收。
用户可以通过在抛弃线程池引用的时候显式地调用shutdown()来释放它,或者将allowCoreThreadTimeOut设置为true,则在过期时间后,核心线程会被释放,则其会被GC回收。

四、如果线程死掉了怎么办

几乎所有Executors中生成线程池的方法的注释上,都有代表相同意思的一句话,表示如果线程池中的某个线程死掉了,线程池会生成一个新的线程代替它。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的注释。

If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

线程死亡的原因

我们都知道守护线程(daemon)会在所有的非守护线程都死掉之后也死掉,除此之外导致一个非守护线程死掉有以下几种可能:

线程池要保证其高可用性,就必须保证线程的可用。如一个固定容量的线程池,其中一个线程死掉了,它必须要能监控到线程的死亡并生成一个新的线程来代替它。ThreadPoolExecutor中与线程相关的有这样几个概念:

ThreadPoolExecutor中如何保障线程的可用

在ThreadPoolExecutor中使用了一个很巧妙的方法实现了对线程池中线程健康状况的监控,代码2是从ThreadPoolExecutor类源码中截取的一段代码,它们在一起说明了其对线程的监控。

可以看到,在ThreadPoolExecutor中的线程被封装成一个对象Worker,而将其中的run()代理到ThreadPoolExecutor中的runWorker(),在runWorker()方法中是一个获取任务并执行的死循环。如果任务的运行出了什么问题(如抛出未捕获异常),processWorkerExit()方法会被执行,同时传入的completedAbruptly参数为true,会重新添加一个初始任务为null的Worker,并随之启动一个新的线程。

//代码2
//ThreadPoolExecutor的动态内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /** 对象中封装的线程 */
    final Thread thread;
    /** 第一个要运行的任务,可能为null. */
    Runnable firstTask;
    /** 任务计数器 */
    volatile long completedTasks;

    //省略其他代码

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

五、回到我的问题

由于各种各样的原因,我们并没有使用数据库自带的主从机制来做数据的复制,而是将主库的所有DML语句作为消息发送到读库(DTS),同时自己实现了数据的重放。第一版的数据同步服务十分简单,对于主库的DML消息处理和消费(写入读库)都是在一个线程内完成的.这么实现的优点是简单,但缺点是直接导致了表与表之间的数据同步会受到影响,如果有一个表A忽然来了很多的消息(往往是批量修改数据造成的),则会占住消息处理通道,影响其他业务数据的及时同步,同时单线程写库吞吐太小。

上文说到,首先想到的是使用线程池来做消息的消费,但是不能直接套用上边说的Executor框架,由于以下几个原因:

重复造轮子是没有意义的,但是在我们这种场景下JDK中现有的Executor框架不符合要求,只能自己造轮子。

我的实现

首先把线程抽象成「DML语句的执行器(Executor)」。其中包含了一个Thread的实例,维护了自己的等待队列(限定容量的阻塞队列),和对应的消息执行逻辑。

除此之外还包含了一些简单的统计、线程健康监控、合并事务等处理。

Executor的对象实现了Thread.UncaughtExceptionHandler接口,并绑定到其工作线程上。同时ExecutorGroup也会再生成一个守护线程专门来守护池内所有线程,作为额外的保险措施。

把线程池的概念抽象成执行器组(ExecutorGroup),其中维护了执行器的数组,并维护了目标表到特定执行器的映射关系,并对外提供执行消息的接口,其主要代码如下:

//代码3
public class ExecutorGroup {

    Executor[] group = new Executor[NUM];
    Thread boss = null;
    Map<String, Integer> registeredTables = new HashMap<>(32);
//    AtomicInteger cursor = new AtomicInteger();
    volatile int cursor = 0;

    public ExecutorGroup(String name) {
        //init group
        for(int i = 0; i < NUM; i++) {
            logger.debug("启动线程{},{}", name, i);
            group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS);

        }
        startDaemonBoss(String.format("sync-executor-%s-boss", name));
    }

    //额外的保险
    private void startDaemonBoss(String name) {
        if (boss != null) {
            boss.interrupt();
        }
        boss = new Thread(() -> {
            while(true) {
                //休息一分钟。。。

                if (this.group != null) {
                    for (int i = 0; i < group.length; i++) {
                        Executor executor = group[i];
                        if (executor != null) {
                            executor.checkThread();
                        }
                    }
                }
            }

        });
        boss.setName(name);
        boss.setDaemon(true);
        boss.start();
    }
    public void execute(Message message){
        logger.debug("执行消息");

        //省略消息合法性验证

        if (!registeredTables.containsKey(taskKey)) {
            //已注册
//          registeredTables.put(taskKey, cursor.getAndIncrement());
            registeredTables.put(taskKey, cursor++ % NUM);
        }
        int index = registeredTables.get(taskKey);
        logger.debug("执行消息{},注册索引{}", taskKey, index);
        try {
            group[index].schedule(message);
        } catch (InterruptedException e) {
            logger.error("准备消息出错", e);
        }

    }

}

完成后整体的线程模型如下图所示:

新的线程模型

Java1.7新加入的TransferQueue

Java1.7中提供了新的队列类型TransferQueue,但只提供了一个它的实现java.util.concurrent.LinkedTransferQueue<E>,它有更好的性能表现,可它是一个无容量限制的队列,而在我们的这个场景下必须要限制队列的容量,所以要自己实现一个有容量限制的队列。

相关栏目:

用户点评