并发编程--下篇,通过CAS操作来保证
并发编程--下篇,通过CAS操作来保证
Java并发探索--下篇
承接上文:
博客园【上篇】:https://www.cnblogs.com/jackjavacpp/p/18852416
csdn:【上篇】:https://blog.csdn.net/okok__TXF/article/details/147595101
1. AQS实现锁
AQS前传
网址:https://www.cnblogs.com/jackjavacpp/p/18787832
1) aqs分析
AQS 的核心原理是通过一个 int
类型的状态变量 state
来表示同步状态,使用一个 FIFO 队列来管理等待的线程。通过 CAS 操作来保证状态的原子性更新,同时提供了独占模式和共享模式的获取与释放方法。子类可以通过重写 tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
等方法来实现具体的同步逻辑。
// 关键的属性:
// 同步状态,0 表示未锁定,大于 0 表示已锁定[>1表示可重入锁的重入次数]
private volatile int state;
// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 其中Node的重要变量
//节点已取消: 表示该节点关联的线程已放弃等待(如超时、被中断),需从队列中移除
static final int CANCELLED = 1;
//需唤醒后继节点: 当前节点的线程释放锁或取消时,必须唤醒其后继节点。
//节点入队后需确保前驱节点的waitStatus为SIGNAL,否则需调整前驱状态。
static final int SIGNAL = -1;
//节点在条件队列中: 表示节点处于条件队列(如Condition的等待队列),而非同步队列(CLH队列)。
//状态转换:当调用Condition.signal()时,节点从条件队列转移到同步队列,状态重置为0
static final int CONDITION = -2;
//共享模式下唤醒需传播: 在共享锁(如Semaphore)释放时,确保唤醒动作传播给所有后续节点。
static final int PROPAGATE = -3;
//通过状态值控制线程的阻塞、唤醒和队列管理
volatile int waitStatus;
aqs独占锁、共享锁的获取和释放分析
- 独占锁
独占锁的获取:
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
:尝试直接获取锁,这是一个需要子类实现的方法, 尝试直接获取锁(如CAS修改state)。【非公平锁:直接尝试CAS抢占资源;公平锁:先检查队列中是否有等待线程(hasQueuedPredecessors()
),避免插队】
如果第一步返回false, 则进入第二步: addWaiter
:将当前线程封装成一个 Node
节点,并添加到等待队列的尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速插入到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速插入失败,使用 enq 方法插入
enq(node);//enq 方法会通过循环和 CAS 操作确保节点成功插入到队列尾部。
return node;
}
private Node enq(final Node node) {
// 死循环
for (;;) {
Node t = tail;
if (t == null) {
// 使用CAS设置头结点 -- 这里是设置了一个普通的node
// 下次循环才会把传进来的node放到队列尾部
if (compareAndSetHead(new Node()))
// 首尾指向同一个节点
tail = head;
} else { // 尾部tail不为空,说明队列中有节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
然后,结点添加到队列尾部之后,acquireQueued
:让当前线程在同步队列中阻塞,然后在被其他线程唤醒时去获取锁;【让线程在同步队列中阻塞,直到它成为头节点的下一个节点,被头节点对应的线程唤醒,然后开始获取锁,若获取成功才会从方法中返回】。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前线程节点的前一个节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 前驱是头节点且获取锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 说明前驱节点会在释放锁时唤醒当前节点,当前线程可以安全地阻塞
return true;
// 如果前驱节点的等待状态大于 0,即 CANCELLED 状态
if (ws > 0) {
// 前驱节点已取消,需要跳过这些已取消的节点
do {
// 将当前节点的前驱节点指向前驱节点的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 更新跳过已取消节点后的前驱节点的后继节点为当前节点
pred.next = node;
} else {
// 前驱节点的等待状态为 0 或 PROPAGATE,将其状态设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 当前线程不应该被阻塞,需要再次尝试获取锁
return false;
}
shouldParkAfterFailedAcquire():确保前驱节点的waitStatus
为SIGNAL
(表示会唤醒后继节点),否则清理已取消的节点;它通过检查前驱节点的等待状态,决定当前线程在获取锁失败后是否应该被阻塞。它处理了前驱节点的不同状态,确保等待队列的正确性和线程的正确阻塞与唤醒,
parkAndCheckInterrupt() :让当前线程阻塞,并且在被唤醒之后检查该线程是否被中断 【里面用到了LockSupport,见后面并发工具】
独占锁的释放:
public final boolean release(int arg) {
//调用tryRelease【子类里面实现】
//尝试修改state释放锁,若成功,将返回true,否则false
if (tryRelease(arg)) {
Node h = head;
// 检查头节点不为空且头节点的等待状态不为 0
if (h != null && h.waitStatus != 0)
// 唤醒头节点的后继节点
unparkSuccessor(h);
return true; // 释放成功,返回 true
}
return false;// 释放失败,返回 false
}
private void unparkSuccessor(Node node) {
// 获取节点的等待状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取节点的后继节点
Node s = node.next;
// 如果后继节点为空或者后继节点的等待状态大于 0(已取消)
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部开始向前查找,找到第一个等待状态小于等于 0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了合适的后继节点,唤醒该节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
- 共享锁
共享锁的获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//这个方法子类实现
//若返回值小于0,表示获取共享锁失败,则线程需要进入到同步队列中等待
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以SHARED加入一个结点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取共享锁成功
setHeadAndPropagate(node, r); // 传播给其他线程
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否阻塞,唤醒后是否被中断--同上
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录旧的头节点
setHead(node); // 将当前节点设置为新的头节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
setHeadAndPropagate(node, r);
先把当前获取到同步状态的节点设置为新的头节点,接着根据不同条件判断是否要将共享状态的获取传播给后续节点。要是满足传播条件,就会调用 doReleaseShared
方法去唤醒后续等待的共享节点。
共享锁的释放
public final boolean release(int arg) {
// 调用tryRelease:【子类实现】
if (tryRelease(arg)) {
// 若释放锁成功,需要将当前线程移出同步队列
Node h = head;
// 若head不是null,且waitStatus不为0,表示它是一个装有线程的正常节点,
// 在之前提到的addWaiter方法中,若同步队列为空,则会创建一个默认的节点放入head
// 这个默认的节点不包含线程,它的waitStatus就是0,所以不能释放锁
if (h != null && h.waitStatus != 0)
// 若head是一个正常的节点,则调用unparkSuccessor唤醒它的下一个节点所对应的线程
unparkSuccessor(h);
// 释放成功
return true;
}
// 释放锁失败
return false;
}
2) 自定义锁
- 自定义一个读写锁
学一下jdk源码,写一个内置的Sync同步器,低位16位记录写锁重入次数,高位16位记录读锁获取次数
// 内置同步器
private static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 写锁方法(tryAcquire/tryRelease)-- 独占
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int state = getState();
int writeCount = getWriteCount(state);
// 如果存在读锁或写锁(且持有者不是当前线程),获取失败
if (state != 0) {
// writeCount是0,但是state不是0,说明有线程获取到了读锁
if (writeCount == 0 || current != getExclusiveOwnerThread())
return false;
}
// 检查是否超过最大重入次数(低16位是否溢出)
if (writeCount + acquires > EXCLUSIVE_MASK)
throw new Error("超出最大重入次数");
// CAS更新写锁状态
if (compareAndSetState(state, state + acquires)) {
setExclusiveOwnerThread(current);
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
int newState = getState() - releases;
boolean free = (getWriteCount(newState) == 0);
if (free)
setExclusiveOwnerThread(null);
setState(newState);
return free;
}
// 读锁方法(tryAcquireShared/tryReleaseShared)
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
int state = getState();
// 如果有其他线程持有写锁,且不是当前线程(允许锁降级),则获取失败
if (getWriteCount(state) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 计算读锁数量
int readCount = getReadCount(state);
if (readCount == (1 << SHARED_SHIFT) - 1)
throw new Error("超出最大读锁数量");
// CAS增加读锁计数(高16位)
if (compareAndSetState(state, state + (1 << SHARED_SHIFT))) {
return 1; // 成功获取读锁
}
return -1; // 需要进入队列等待
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int state = getState();
int readCount = getReadCount(state);
if (readCount == 0)
throw new IllegalMonitorStateException();
// CAS减少读锁计数
int newState = state - (1 << SHARED_SHIFT);
if (compareAndSetState(state, newState)) {
return readCount == 1; // 最后一个读锁释放时可能触发唤醒
}
}
}
// 其他辅助方法
int getReadCount(int state) { return state >>> SHARED_SHIFT; }
int getWriteCount(int state) { return state & EXCLUSIVE_MASK; }
}
上面的同步器中,需要注意的点如下:
高16位和低16位是啥情况?
1. 从state获取写重入次数 和 读锁持有数====================
先说低16位,我们都知道int是32位的整数,用低16位的二进制位表示写锁的重入次数,如下:
32位二进制:
[高位16位]11111111 11111111 | [低位16位]11111111 11111111
16位二进制全部是1,那么其表示的数字就是 2^16 - 1 = 65535【也就是说最大可重入次数是65535次】
既然现在是用的state的低位16位来记录的写锁重入次数,我们要怎么获取state的低位16位表示的数字呢?
很明显: state & ( 65535 ) 就行了: 也就是上面的 state & EXCLUSIVE_MASK
高位16位呢?【读锁获取的次数】
是不是state无符号右移16位就行了,剩下的不就是高位的16位了吗
也就是上面的:state >>> SHARED_SHIFT
2. 增加/减少重入次数 和 读锁持有数====================
写锁的话,直接state加减就可以了,因为直接加减就是从最低位开始的;
读呢? 因为需要把数字加到高位部分的那16位去,所以把需要加的数左移16位就好了;减的话同理。
知道了这些然后就好理解了
public class TReadWriteLock {
private final Sync sync;
private final ReadLock readLock;
private final WriteLock writeLock;
public TReadWriteLock() {
sync = new Sync();
readLock = new ReadLock(sync);
writeLock = new WriteLock(sync);
}
// 对外暴露读写锁
public Lock readLock() {return readLock;}
public Lock writeLock() {return writeLock;}
// 同步器Sync
....
// 读锁(共享)
public static class ReadLock implements Lock {
private final Sync sync;
public ReadLock(Sync sync) { this.sync = sync; }
public void lock() { sync.acquireShared(1); }
public void unlock() { sync.releaseShared(1); }
// 其他方法(略)
}
// 写锁(独占)
public static class WriteLock implements Lock {
private final Sync sync;
public WriteLock(Sync sync) { this.sync = sync; }
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
// 其他方法(略)
}
}
这样自定义了一个简单的读写锁就完成了, 然后测试一下
public class CustomLockTest {
private TReadWriteLock readWriteLock;
private Lock readLock;
private Lock writeLock;
private Map<String, String> data;
public CustomLockTest() {
readWriteLock = new TReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
data = new HashMap<>();
}
public static void main(String[] args) {
CustomLockTest obj = new CustomLockTest();
// 两个线程写
new Thread(() -> obj.write("key", "value"), "写Thread-1").start();
new Thread(() -> obj.write("key", "value5"), "写Thread-2").start();
// 4个线程读
for (int i = 0; i < 4; i++)
new Thread(() -> System.out.println(obj.read("key")), "读" + i).start();
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("main线程结束");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void write(String key, String value) {
writeLock.lock();
try {
System.out.println( Thread.currentThread().getName() + "写入中~~~");
TimeUnit.SECONDS.sleep(1);
data.put(key, value);
System.out.println( Thread.currentThread().getName() + "写入ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock();
}
}
public String read(String key) {
readLock.lock();
try {
System.out.println( Thread.currentThread().getName() + "读取中~~~");
TimeUnit.SECONDS.sleep(2);
System.out.println( Thread.currentThread().getName() + "读取ok~~~" + data.get(key));
return data.get(key);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
readLock.unlock();
}
}
}
2. 探索并发工具
①ConcurrentHashMap
- jdk1.8
Java 提供了一个多线程版本的ConcurrentHashMap。不仅线程安全,还能保持一定的性能。普通版本的HashMap看这里:
普通的Map --网址:https://www.cnblogs.com/jackjavacpp/p/18787832
本文这里主要看其put方法和get方法: 我这里就写在注释里面了
先看put方法:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 熟悉HashMap的都知道,HashMap是允许key为null的!
// 这里key、value都不能为null!!!!
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;// 用于记录链表或红黑树中节点的数量
// 熟悉HashMap的都知道,HashMap.put东西最外层是没有循环的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化底层table
// hash计算出的index上的位置是空的
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// cas修改这个位置--那么看到这里应该很清楚了,外面为什么会有for循环了
// 这一看就是cas的自旋锁嘛
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // cas修改ok,就break了
}
// 如果该位置的节点的哈希值为 MOVED,说明正在进行扩容操作,当前线程协助进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else { // hash计算出的index上的位置不是空的
V oldVal = null;
// f是table[(n - 1) & hash]的元素,
// 可以理解为f表示某一个桶,这里锁某一个桶,减小了锁的粒度
synchronized (f) {
// 判断一下该位置是不是被别人动过了
if (tabAt(tab, i) == f) {
// fh是f的hash值
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 检查当前节点的键是否与要插入的键相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;// 记录旧值
if (!onlyIfAbsent)// 如果 onlyIfAbsent 为 false,更新节点的值
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果遍历到链表末尾,将新节点插入到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果该位置的节点是 TreeBin 类型,说明该位置是一个红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果 binCount 不为 0,说明已经完成插入或更新操作
if (binCount != 0) {
// 如果链表长度达到树化阈值,将链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新元素数量并检查是否需要扩容
addCount(1L, binCount);
return null;
}
值得注意的是:tabAt方法是以原子操作的方式获取 ConcurrentHashMap
底层数组中指定索引位置的节点,以此保证数据的一致性和线程安全。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
从上面put元素的过程可以知道:ConcurrentHashMap的put是 synchronized锁 + cas自旋来达到线程安全的效果的。【这是jdk1.8】
下面看jdk8的get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算哈希值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 先判空
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
可以看到get方法并没有加锁。
ConcurrentHashMap的新方法:
putlfAbsent(K key,Vvalue):只有当key不存在时才插入。
此外,ConcurrentHashMap中map.put(key, map.get(key) + 1);并不会保证原子性。为了保证复合操作的原子性,ConcurrentHashMap在1.8中还有HashMap里面没有的新方法:
compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) :
对 key 对应的值进行重新计算。
merge(K key, V value, BiFunction<? super V, ? super V, ?extends V> remappingFunction) : 如果 key 已存在,就用新的 value 与l日值进行合并。
@FunctionalInterface
public interface BiFunction<T, U, R> {
// 接受两个参数,返回一个值就行了
R apply(T t, U u);
}
public static void testConcurrentHashMap() {
ConcurrentHashMap<String, Object> m = new ConcurrentHashMap<>();
m.put("1", "##");
printMap(m);
m.put("1", "2");
printMap(m);
//===================================== 【1】putIfAbsent
m.putIfAbsent("2", "3");
m.putIfAbsent("1", "3"); // 这一个并没有覆盖原来的喔
printMap(m);
//===================================== 【2】compute
// 把key为"1"的value变成 k + v + "hahah"
m.compute("1", (k, v) -> k + v + "hahah");
// key不存在,则添加 --- value就变成: 3nullhahah
m.compute("3", (k, v) -> k + v + "hahah");
printMap(m);
//===================================== 【3】merge
m.merge("1", "3ppp", (oldVal, newVal) -> oldVal.toString() + newVal); // oldVal:旧值 newVal:新值[传进去的value]
m.merge("4", "3", (oldVal, newVal) -> "hahah" + oldVal + newVal); // 不存在4这个key,那么就put("4", "3")
printMap(m);
}
- jdk1.7
在 JDK 1.7 中,ConcurrentHashMap
的 put
方法实现是基于分段锁机制的。它将整个哈希表分成多个 Segment
,每个 Segment
类似于一个小的 HashMap
,并且每个 Segment
都有自己的锁,不同 Segment
之间的操作可以并发进行,从而提高了并发性能。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)//空值检查
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;//确定 Segment 索引
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);//调用 Segment 的 put 方法
}
//Segment 的 put 方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);//获取锁
V oldValue;
try {// 查找或插入节点
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;//计算 key 在 Segment 内部数组中的索引 index
HashEntry<K,V> first = entryAt(tab, index);//获取该索引位置的第一个 HashEntry 节点 first。
for (HashEntry<K,V> e = first;;) {//遍历链表
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//如果找到相同的 key,根据 onlyIfAbsent 参数决定是否覆盖原有的值。
if (!onlyIfAbsent) {
e.value = value;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
②流程控制
在JDK的并发包里面提供了几个非常有用的并发工具,CountDownLatch、CyclicBarrier、Semaphore,他们提供了一种并发控制流程的手段。
1) CountDownLatch
核心功能是让一个或多个线程等待其他线程完成操作后再继续执行,它的使用步骤:
1.初始化计数器:创建时指定计数值(如 new CountDownLatch(3)),表示需要等待的事件数量。
2.任务完成减计数:每个线程完成任务后调用 countDown(),计数器减 1。
3.等待计数归零:调用 await() 的线程会阻塞,直到计数器变为 0,所有等待线程被唤醒。
它的大致应用场景:
- 主线程等待子线程完成:如启动服务时,主线程需等待多个组件初始化完成,这些组件又是并行执行的。
- 多阶段任务协调:多个线程完成当前阶段后,共同进入下一阶段(如数据处理流水线)。
- 最大并行性控制:多个线程同时开始执行任务
下面给一个“火箭发射”的例子:
public class RocketLaunchDemo {
public static void main(String[] args) throws InterruptedException {
// 初始化计数器为3(3个检查任务)
CountDownLatch latch = new CountDownLatch(3);
// 创建检查线程
Thread fuelCheck = new Thread(new CheckTask(latch, "燃料检查"), "Thread-1");
Thread engineCheck = new Thread(new CheckTask(latch, "引擎检查"), "Thread-2");
Thread navigationCheck = new Thread(new CheckTask(latch, "导航检查"), "Thread-3");
// 启动检查线程
fuelCheck.start();
engineCheck.start();
navigationCheck.start();
// 主线程等待所有检查完成
System.out.println("等待所有检查完成...");
latch.await(); // 阻塞直到计数器归零
System.out.println("所有检查完成,火箭点火发射!");
}
static class CheckTask implements Runnable {
private final CountDownLatch latch;
private final String taskName;
public CheckTask(CountDownLatch latch, String taskName) {
this.latch = latch;
this.taskName = taskName;
}
@Override
public void run() {
try {
// 模拟检查耗时
Thread.sleep((long) (Math.random() * 2000));
System.out.println("【" + Thread.currentThread().getName() + "】" + taskName + "通过!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 无论是否异常,必须减少计数器
latch.countDown();
}
}
}
}
这好用啊!!但是需要注意的事:
- 它是一次性的【计数器归零后无法重置,若需重复使用,需创建新实例或改用
CyclicBarrier
】; - 对于
countDown()
来说,尽量把它放在finally中,因为如果出现异常导致不能执行countDown()
,从而计数器无法归零,一直等待咯;所以可以使用await(long timeout, TimeUnit unit)
可防止无限等待。同时,计数器值需与实际任务数一致,否则可能导致主线程提前或永久阻塞,如果上述案例中new CountDownLatch(3)
变成了new CountDownLatch(4)
,少一个任务就会永久阻塞的。
底层原理:
其底层是基于aqs的共享模式实现的
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); // 初始化内部的同步器
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 设置aqs的state
}
}
// 当我们调用countDown()时:实际是同步器的releaseShared
public void countDown() {
sync.releaseShared(1);
}
// 同步器的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 递减1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 调用await()的时候: 实际是sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 如果小于0
doAcquireSharedInterruptibly(arg);// 就等待去吧
}
// 同步器的tryAcquireShared
protected int tryAcquireShared(int acquires) {
// 如果state不是0,说明有不能把等待的线程唤醒,那就等待吧
return (getState() == 0) ? 1 : -1;
}
2) CyclicBarrier
CyclicBarrier 是 Java 并发包中基于 ReentrantLock 和 Condition 实现的同步工具类,它的使用步骤:
1. new屏障计数器:初始化时指定参与同步的线程数(parties),每个线程调用 await() 时计数器减 1。
2. 屏障动作:当计数器归零时,执行构造时传入的 Runnable 任务(屏障动作),随后所有等待线程被唤醒。
3. 可重用:屏障释放后,计数器自动重置为初始值,支持重复使用
方法 | 作用 |
---|---|
CyclicBarrier(int parties) |
初始化屏障,指定参与同步的线程数 |
CyclicBarrier(int parties, Runnable barrierAction) |
指定屏障动作(所有线程到达后执行) |
int await() |
线程等待,直到所有线程到达屏障点 |
int await(long timeout, TimeUnit unit) |
超时等待,返回超时状态 |
void reset() |
重置屏障为初始状态(清除已等待线程) |
boolean isBroken() |
检查屏障是否处于损坏状态(如线程中断导致屏障失效) |
下面再举一个例子:模拟 数据处理流水线
有三个阶段步骤:加载数据,计算数据,存储数据;有三个线程,每个线程里面都执行这三个阶段,但是必须要等该阶段所有线程执行完才能进行下一阶段
public class PipelineDemo {
private static final int THREAD_COUNT = 3;
private static final CyclicBarrier barrier = new CyclicBarrier(
THREAD_COUNT,
() -> System.out.println("【阶段同步完成】所有线程进入下一阶段")
);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new PipelineTask(i)).start();
}
}
static class PipelineTask implements Runnable {
private final int taskId;
public PipelineTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
try {
// 阶段1:加载数据
System.out.println("【线程" + taskId + "】加载数据中...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("【线程" + taskId + "】数据加载完成");
// 等待其他线程完成阶段1----在这里等着了,直到CyclicBarrier中parties达到了设置的3,才会唤醒
barrier.await();
// 阶段2:计算
System.out.println("【线程" + taskId + "】计算中...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("【线程" + taskId + "】计算完成");
barrier.await(); // 等待其他线程完成阶段2
// 阶段3:存储
System.out.println("【线程" + taskId + "】存储中...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("【线程" + taskId + "】存储完成");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
从上面可以看出来,CyclicBarrier的计数是递增到设定值后归零,这样才进行下一阶段的。
CyclicBarrier的构造函数第二个参数是屏障动作:在最后一个线程到达屏障时执行,常用于汇总结果或触发下一阶段(如示例中的阶段同步提示)
与上一个工具比较一下:
CyclicBarrier | CountDownLatch | |
---|---|---|
线程角色 | 所有线程对等,互相等待 | 主线程等待子线程完成 |
重用性 | 可循环使用(自动重置) | 一次性使用 |
计数方向 | 也是减到0,但是会重置 | 递减到 0 |
典型场景 | 多阶段同步(如流水线) | 单次汇总(如等待初始化完成) |
屏障动作 | 支持(最后一个线程触发) | 不支持 |
思考一下,上面的三阶段例子我们用CountDownLatch也实现一下?交给读者了
底层原理:
// 构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties; // 把parties赋值给count
this.barrierCommand = barrierAction;
}
// 调用await时
public int await() throws InterruptedException, BrokenBarrierException {
...
return dowait(false, 0L);
..
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock; // 底层是用的lock可重入锁-实际上还是aqs
lock.lock(); // 加锁
try {
final Generation g = generation;
....
// count先减一
int index = --count;
if (index == 0) { // 如果减到0了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 运行传进来的runnable, 这里只是调用run方法
if (command != null) command.run();
ranAction = true;
/*
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); // 唤醒所有的
// 重置次数!!!
count = parties;
generation = new Generation();
}
*/
nextGeneration();
return 0;
} finally {
if (!ranAction) breakBarrier();
}
}
// 如果没有减到0
for (;;) {
try {
if (!timed) trip.await(); // 等待吧
else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 有时限等待吧
} catch (InterruptedException ie) {
.....
}
....
}
} finally {
lock.unlock(); // 解锁
}
}
3) Semaphore
Semaphore(信号量)是 Java 并发包中用于 控制并发访问资源数量 的同步工具,他的核心机制
许可证模型:通过计数器(permits
)表示可用资源数量,线程需获取许可证(acquire()
)才能访问资源,释放时归还许可证(release()
线程协作:当许可证耗尽时,新线程进入等待队列,资源释放后按策略(公平/非公平)唤醒等待线程
底层实现:基于 AbstractQueuedSynchronizer
(AQS)的共享模式,通过 state
字段记录可用许可证数量
核心 API 与参数:
方法/参数 | 作用 |
---|---|
Semaphore(int permits) |
初始化信号量,指定最大并发访问数(许可证总数) |
Semaphore(int permits, boolean fair) |
设置公平模式(true 按请求顺序分配许可证) |
acquire() |
获取许可证(阻塞直到可用) |
release() |
释放许可证(唤醒等待线程) |
tryAcquire() |
非阻塞尝试获取许可证,立即返回结果 |
availablePermits() |
查询当前可用许可证数量 |
看了出来,好像有那么个限流的意思吼,所以它的应用场景,我们可以用它
资源池管理:比如数据库连接池限制最大连接数;限流控制:限制接口并发请求线程数;多线程分阶段协作:控制同时执行任务的线程数量
下面举个例子:停车场管理,模拟一个停车场,最多允许5辆车同时停放。车辆(线程)需获取停车位(许可证)才能进入,离开时释放许可证。
// 停车场管理,模拟一个停车场,最多允许5辆车同时停放。车辆(线程)需获取停车位(许可证)才能进入,离开时释放许可证。
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
// 创建几辆车
new Thread(new Car("迈巴赫1", semaphore)).start();
new Thread(new Car("保时捷2", semaphore)).start();
new Thread(new Car("奔驰3", semaphore)).start();
new Thread(new Car("宝马4", semaphore)).start();
new Thread(new Car("法拉利5", semaphore)).start();
new Thread(new Car("五菱6", semaphore)).start();
new Thread(new Car("本田7", semaphore)).start();
}
static class Car implements Runnable {
private final String name;
private final Semaphore semaphore;
public Car(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println("car " + Thread.currentThread().getId() + ":" + name + " 获取停车位中~~~");
semaphore.acquire();
System.out.println("car " + Thread.currentThread().getId() + ":" + name + " 停车了");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("car " + Thread.currentThread().getId() + ":" + name + " 离开车位");
semaphore.release();
}
}
}
}
底层原理:
// 默认非公平锁,初始化内部同步器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//
Sync(int permits) {
setState(permits); // 设置state
}
// 调用acquire时【尝试将state-1】
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 实际上是
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires; // 尝试减1
if (remaining < 0 ||
compareAndSetState(available, remaining))
// 1.如果小于0,就可以直接返回了
// 2.如果cas设置成功了,也直接返回[这个时候肯定是大于0的嘛]--那线程就等待去吧
return remaining;
}
}
// release【尝试将state+1】
略过了。。读者自行查看把
了解到上面三个工具类的实现,读者可以自己实现上面三个类吗?实际上通俗说就是加减state,state为正数就等待着,小于等于0就唤醒所有的等待线程了
③CompletableFuture
优雅的任务编排工具!由于这个工具用法很多,我这里只给出部分场景及使用CompletableFuture解决的方案
场景一:查询A库,查询B库,然后汇总结果
public class Test1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 10, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
long start = System.currentTimeMillis();
// 第二个参数是可选的,我这里加上了线程池
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(FutureJdk8Tools::queryA, pool);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(FutureJdk8Tools::queryB, pool);
// 2.合并两个future
CompletableFuture<String> res = futureB
.thenCombine(futureA, (b, a) -> "A库" + a + "--B库" + b) // 合并两个future的结果
.exceptionally(e -> "出现异常了"); // 处理异常
String resStr = res.get();
System.out.println("【汇总结果】" + resStr);
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "毫秒");
pool.shutdown();
}
public static String queryA() {
System.out.println(Thread.currentThread().getName() + "正在查询A库");
try {
Thread.sleep(1500);
// int a = 1 / 0; // 模拟一个异常
} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println(Thread.currentThread().getName() + "查询A库 ok");
return "A库信息";
}
public static String queryB() {
System.out.println(Thread.currentThread().getName() + "正在查询B库");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println(Thread.currentThread().getName() + "查询B库 ok");
return "B库信息";
}
}
supplyAsync: 执行任务,支持返回值; 当然有runAsync:执行任务,没有返回值;
thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值;
当出现异常时,exceptionally 中会捕获该异常
那么,如果不是两个库呢?比如说是n个库
//====================== 查询多个库汇总结果
public static void queryTotal2() throws Exception {
// 1.线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(5, 10, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
List<String> tasks = Arrays.asList("A库:1000", "B库:2000", "C库:3000", "D库:1500");
// 构建任务列表
List<CompletableFuture<Integer>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> queryDataBase(task), pool))
.collect(Collectors.toList());
// allOf返回这个CompletableFuture<Void>
CompletableFuture<List<Integer>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
//thenApplyAsync()用于串行化另一个CompletableFuture,将任务提交到独立线程池执行,避免阻塞当前线程链
//接收前序任务的结果作为输入,返回新的 CompletableFuture 对象
.thenApplyAsync(
v -> // v: Void
futures.stream()
.mapToInt(f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}) // intStream
.boxed() // 需要变成包装类
.collect(Collectors.toList())
, pool2);
List<Integer> integers = result.get();
//[1000, 2000, 3000, 1500]
System.out.println(integers);
pool.shutdown();
pool2.shutdown();
}
public static Integer queryDataBase( String db ) {
String[] s1 = db.split(":");
String dbName = s1[0];
int time = Integer.parseInt(s1[1]);
try {
System.out.println(Thread.currentThread().getName() + "正在查询" + dbName);
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + "查询" + dbName + " ok");
} catch (InterruptedException e) {throw new RuntimeException(e);}
return time;
}
场景二:货比多家,查询多家商店同一商品价格,返回最便宜的
//====================== 货比多家
public static void queryHuoBi() throws Exception {
List<String> tasks = Arrays.asList("A店:100", "B店:200", "C店:300", "D店:50"); // 每个店的价格信息
// 构建任务列表
List<CompletableFuture<Integer>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> taskComputePrice(task)))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
// 遍历得到最小值
int minPrice = futures.stream()
.mapToInt(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.min()
.orElseThrow(() -> new IllegalStateException("No price found"));
System.out.println("最小价格:" + minPrice);
}
public static int taskComputePrice(String msg ) {
String[] split = msg.split(":");
String store = split[0];
int price = Integer.parseInt(split[1]);
System.out.println(Thread.currentThread().getName() + "正在查询" + store + "价格");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println(Thread.currentThread().getName() + "查询" + store + "价格 ok");
return price;
}
allOf:等待所有任务完成
场景三:在多个网站搜索结果,任一一个返回结果就结束
//====================== 搜索结果
public static void queryWebsite() throws Exception {
List<String> tasks = Arrays.asList("百度:1200", "CSDN:2000", "博客园:1500", "谷歌:3000");
// 构建任务列表
List<CompletableFuture<String>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> searchRes(task)))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
Object result = anyOf.get();
System.out.println(result + "最先返回了结果");
}
public static String searchRes( String webSite ) {
String[] s1 = webSite.split(":");
String webSiteName = s1[0];
int time = Integer.parseInt(s1[1]);
System.out.println(Thread.currentThread().getName() + "正在查询" + webSite);
try {
Thread.sleep(time);
} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println(Thread.currentThread().getName() + "查询" + webSite + " ok");
return webSiteName;
}
anyOf:只要有一个任务完成
④volatile
在了解这个关键字之前,我们需要知道并发编程的三个基本概念:
分别是 原子性、可见性、有序性:
【原子性】:一个操作或者多个操作要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行
(对于基本数据类型(像 int、long 等),单个变量的读写操作通常具有原子性。)
(使用 synchronized 关键字:synchronized 关键字能够保证在同一时刻只有一个线程可以执行被它修饰的代码块或者方法,从而确保操作的原子性。)
(原子类,例如 AtomicInteger、AtomicLong )
【可见性】:当一个线程修改了共享变量的值时,其他线程能够立即得知这个修改。
在多线程环境下,每个线程都有自己的工作内存,共享变量会被存储在主内存中。
线程在操作共享变量时,会先将变量从主内存拷贝到自己的工作内存,操作完成后再将结果写回主内存。
这就可能导致一个线程对共享变量的修改不能及时被其他线程看到。
【有序性】:程序按照代码的先后顺序执行。
但在 Java 中,为了提高性能,编译器和处理器可能会对指令进行重排序。
重排序分为三种类型:编译器重排序、指令级并行重排序和内存系统重排序。
虽然重排序可以提高程序的性能,但在多线程环境下可能会导致程序出现错误。
volatile
是 Java 并发编程中用于 保证变量可见性和禁止指令重排序 的关键字,它通过内存屏障和禁止指令重排来保证线程间的正确交互, 他可以:
可见性:
- 当一个线程修改了
volatile
变量的值,其他线程能立即看到该变化。 - 原理:写操作时,JVM 会强制将变量值刷新到主内存;读操作时,直接从主内存读取最新值(而非线程本地缓存)
禁止指令重排序
- 编译器和 CPU 可能对指令进行重排序以优化性能,
volatile
通过插入内存屏障(Memory Barrier)阻止这种优化。
举个例子:
int a = 0;
volatile boolean flag = false;
// 写操作~~~
a = 1;
flag = true; // 写volatile变量后,a=1的写入不会被重排序到flag之后
// 读操作
if (flag) { // 读volatile变量时,会强制从主内存读取a的值
System.out.println(a); // 保证输出1
}
举个例子:
public class VisibilityProblemDemo {
private static boolean flag = false; // 非volatile变量
// private static volatile boolean flag = false; volatile变量---这个就不会出现问题了
public static void main(String[] args) {
// 线程1:持续检查flag是否变为 true
new Thread(() -> {
System.out.println("【线程1】开始等待flag变为true...");
while (!flag) {
// 空循环,无其他操作
}
System.out.println("【线程1】检测到flag已变为 true,退出循环");
}).start();
// 主线程休眠1秒,确保线程1已启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程2:修改flag为true
new Thread(() -> {
try {
Thread.sleep(500); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true; // 修改共享变量
System.out.println("【线程2】已将flag设为 true");
}).start();
}
}
上述会出现线程无法停止的情况!
但是它不保证原子性!volatile
仅保证单个读/写操作的原子性,复合操作(如 i++
)不保证原子性。【用synchronized和lock可以保证原子性,那用这俩可以保证可见性吗?】
public class SyncVolatileCompare {
private boolean flag = false; // 非volatile变量
/*
当线程进入 synchronized 代码块时,会清空本地内存中所有共享变量的副本,强制从主内存重新读取。
当线程退出 synchronized 代码块时,会将本地内存中修改的共享变量立即刷新到主内存。
*/
public synchronized void setFlag(boolean flag) {
this.flag = flag;
}
public synchronized boolean getFlag() {
return flag;
}
public static void main(String[] args) throws InterruptedException {
SyncVolatileCompare syncVolatileCompare = new SyncVolatileCompare();
new Thread(() -> {
System.out.println("【线程1】开始等待 flag 变为 true...");
while (!syncVolatileCompare.getFlag()) {
// ...
}
System.out.println("【线程1】检测到 flag 已变为 true,退出循环");
}).start();
Thread.sleep(1000);
Thread thread = new Thread(() -> {
System.out.println("【线程2】开始修改 flag...");
try {
Thread.sleep(500); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
syncVolatileCompare.setFlag(true);
});
thread.start();
}
}
lock也是同理,lock()
方法会清空本地内存,强制从主内存读取共享变量。unlock()
方法会将修改的共享变量刷新到主内存。
那么,有序性呢?
synchronized 的有序性保证:
synchronized 代码块内的代码不会被编译器或 CPU 指令重排,因为锁的获取和释放会插入内存屏障(Memory Barrier)。
内存屏障会强制代码按顺序执行,防止重排优化。
Lock 的有序性保证:
Lock 的实现(如 ReentrantLock)在 lock() 和 unlock() 方法中会插入内存屏障,确保锁内代码的顺序性。
锁外的代码可能被重排,但锁内的代码有序的。
原子性就不用说了。
3. 虚拟线程简介
jdk21带来的重磅内容就是虚拟线程(虚拟线程在jdk19 仅为预览特性,在jdk21才转正),它是一种轻量级线程。此前,很多语言都有类似于“虚拟线程”的技术,比如Go、C#、Erlang等,他们称之为“协程”。
多个虚拟线程共享同一个操作系统的线程,故虚拟线程的数量是可以远大于操作系统线程的数量的,同时资源占用极低,虚拟线程的栈空间默认KB为单位的;还是由JVM 管理,由 JVM 直接调度,无需绑定操作系统线程,突破传统线程数量限制(可支持数百万甚至上亿虚拟线程);阻塞操作自动挂起,当虚拟线程执行 I/O(如网络请求io、文件读写io)时,JVM 会将其挂起并释放底层平台线程,供其他任务使用。
这么牛逼吗?下面与普通线程比较一下
维度 | 传统线程(平台线程) | 虚拟线程 |
---|---|---|
资源占用 | 相比而言:高(依赖 OS 调度) | 极低(JVM 管理) |
并发能力 | 受限于 OS 线程数(通常数千) | 支持百万级并发 |
阻塞影响 | 阻塞操作占用 OS 线程,降低吞吐量 | 阻塞时自动释放线程,提升资源利用率 |
适用场景 | CPU 密集型任务 | I/O 密集型任务(如网络请求、数据库查询) |
小小试用一下:
怎么创建虚拟线程呢?
public static void createVirtualThread() {
// 创建虚拟线程 -- 写法一
Thread.startVirtualThread(() -> System.out.println("Hello, world!"));
// 创建虚拟线程 -- 写法二
Thread.ofVirtual().start(() -> System.out.println("Hello, world!"));
// 线程工厂创建 -- 写法三
ThreadFactory factory = Thread.ofVirtual().factory();
factory.newThread(() -> System.out.println("Hello, world!"));
// 线程池
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
service.shutdown();
}
// 实际上
public static Builder.OfVirtual ofVirtual() {
return new ThreadBuilders.VirtualThreadBuilder();
}
// 通过VirtualThreadBuilder来创建的
// VirtualThreadBuilder.java 【内部类】
public Thread unstarted(Runnable task) {
Objects.requireNonNull(task);
var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task);
UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
if (uhe != null)
thread.uncaughtExceptionHandler(uhe);
return thread;
}
static Thread newVirtualThread(Executor scheduler,
String name,
int characteristics,
Runnable task) {
if (ContinuationSupport.isSupported()) {
// new的VirtualThread
return new VirtualThread(scheduler, name, characteristics, task);
} else {
if (scheduler != null)
throw new UnsupportedOperationException();
return new BoundVirtualThread(name, characteristics, task);
}
}
对比一下普通线程
public class TestVirtualThread {
public static void main(String[] args) {
testNormal();
testVirtual();
}
// 测试虚拟线程
public static void testVirtual() {
try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor()) {
// 开始时间
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
int finalI = i;
service.submit(() -> {
int t = finalI + 1;
});
}
// 结束时间
long end = System.currentTimeMillis();
System.out.println("虚拟线程耗时:" + (end - start));
} catch (Exception e) {
e.printStackTrace();
}
}
// 测试普通线程
public static void testNormal() {
try {
// 开始时间
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
int finalI = i;
new Thread(() -> {
int t = finalI + 1;
}).start();
}
// 结束时间
long end = System.currentTimeMillis();
System.out.println("普通线程耗时:" + (end - start));
} catch (Exception e){
e.printStackTrace();
}
}
}
/*
普通线程耗时:13989
虚拟线程耗时:389
*/
// 案例二对比
public class VirtualThreadDemo {
public static void main(String[] args) {
// 任务数量
final int TASK_COUNT = 10000;
// 使用普通线程池(固定 100 个线程)
runWithPlatformThreads(TASK_COUNT);
// 使用虚拟线程池
runWithVirtualThreads(TASK_COUNT);
}
// 普通线程池测试
private static void runWithPlatformThreads(int taskCount) {
long start = System.currentTimeMillis();
// 固定线程池(普通线程)
try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
Thread.sleep(1000); // 模拟 I/O 阻塞
return null;
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
long duration = System.currentTimeMillis() - start;
System.out.println("[普通线程] 总耗时: " + duration + " ms");
}
// 虚拟线程池测试
private static void runWithVirtualThreads(int taskCount) {
long start = System.currentTimeMillis();
// 虚拟线程池(每个任务一个虚拟线程)
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
Thread.sleep(1000); // 模拟 I/O 阻塞
return null;
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
long duration = System.currentTimeMillis() - start;
System.out.println("[虚拟线程] 总耗时: " + duration + " ms");
}
}
// 为啥普通线程池不设置1万个,你这对比不公平。。
虚拟线程限制
- CPU 密集型任务:虚拟线程无法提升计算性能,此时仍需普通线程
- 库兼容性:部分旧库可能未适配虚拟线程(如依赖
ThreadLocal
的库)
现在有什么好的虚拟线程应用场景呢?【多IO】
文件读写密集的应用(文件io)、微服务调用|数据库查询(网络io)。
end. 参考
- https://www.cnblogs.com/tuyang1129/p/12670014.html 【博客园 】
- https://blog.csdn.net/chenwendangding/article/details/99065623 【HashMap与ConcurrentHashMap工作原理、区别和总结】
- https://blog.csdn.net/weixin_46119595/article/details/139124887【ConcurrentHashMap 复合操作下丢失原子性】
- https://zhuanlan.zhihu.com/p/680274968 【知乎-】
- https://blog.csdn.net/u012723673/article/details/80682208 【csdn - volatile】
- 【微信公众号里面的:并发合集】https://mp.weixin.qq.com/mp/appmsgalbum?__biz=MzkxODI2MDMzMA==&action=getalbum&album_id=2263501677771161601
用户点评