如何正确的使用Java事件通知(1)(3)
同步
要再多线程的环境里使用 StateHolder ,它就必须是线程安全的。不过这也很容易实现,给我们类里面的每个方法加上 synchronized 就搞定了,不是吗?
- public class StateHolder {
- public synchronized void addStateListener( StateListener listener ) { [...]
- public synchronized void removeStateListener( StateListener listener ) { [...]
- public synchronized int getState() { [...]
- public synchronized void setState( int state ) { [...]
现在我们读写操作 一个 StateHolder 实例的时候都有了内置锁(Intrinsic Lock) 做保证,这使得公有方法具有了原子性,也确保了正确的状态对不同的线程都可见。任务完成!
才怪……尽管这样的实现是线程安全的,但一旦程序要调用它,就需要承担死锁的风险。
设想一下如下这种情形:线程 A 改变了 StateHolder 的状态 S,在向各个监听器(listener)广播这个状态 S 的时候,线程 B 视图访问状态 S ,然后被阻塞。如果 B 持有了一个对象的同步锁,这个对象又是关于状态 S的,并且本来是要广播给众多监听器当中的某一个的,这种情况下我们就会遇到一个死锁。
这就是为什么我们要缩小状态访问的同步性,在一个“保护通道”里面来广播这个事件:
- public class StateHolder {
- private final Set listeners = new HashSet<>();
- private int state;
- public void addStateListener( StateListener listener ) {
- synchronized( listeners ) {
- listeners.add( listener );
- }
- }
- public void removeStateListener( StateListener listener ) {
- synchronized( listeners ) {
- listeners.remove( listener );
- }
- }
- public int getState() {
- synchronized( listeners ) {
- return state;
- }
- }
- public void setState( int state ) {
- int oldState = this.state;
- synchronized( listeners ) {
- this.state = state;
- }
- if( oldState != state ) {
- broadcast( new StateEvent( oldState, state ) );
- }
- }
- private void broadcast( StateEvent stateEvent ) {
- Set snapshot;
- synchronized( listeners ) {
- snapshot = new HashSet<>( listeners );
- }
- for( StateListener listener : snapshot ) {
- listener.stateChanged( stateEvent );
- }
- }
- }
上面这段代码是在之前的基础上稍加改进来实现的,通过使用 Set 实例作为内部锁来提供合适(但也有些过时)的同步性,监听者的通知事件在保护块之外发生,这样就避免了一种死等的可能。
注意: 由于系统并发操作的天性,这个解决方案并不能保证变化通知按照他们产生的顺序依次到达监听器。如果观察者一侧对实际状态的准确性有较高要求,可以考虑把 StateHolder 作为你事件对象的来源。
如果事件顺序这在你的程序里显得至关重要,有一个办法就是可以考虑用一个线程安全的先入先出(FIFO)结构,连同监听器的快照一起,在 setState 方法的保护块里缓冲你的对象。只要 FIFO 结构不是空的,一个独立的线程就可以从一个不受保护的区域块里触发实际事件(生产者-消费者模式),这样理论上就可以不必冒着死锁的危险还能确保一切按照时间顺序进行。我说理论上,是因为到目前为止我也还没亲自这么试过。。
鉴于前面已经实现的,我们可以用诸如 CopyOnWriteArraySet 和 AtomicInteger 来写我们的这个线程安全类,从而使这个解决方案不至于那么复杂:
- public class StateHolder {
- private final Set listeners = new CopyOnWriteArraySet<>();
- private final AtomicInteger state = new AtomicInteger();
- public void addStateListener( StateListener listener ) {
- listeners.add( listener );
- }
- public void removeStateListener( StateListener listener ) {
- listeners.remove( listener );
- }
- public int getState() {
- return state.get();
- }
- public void setState( int state ) {
- int oldState = this.state.getAndSet( state );
- if( oldState != state ) {
- broadcast( new StateEvent( oldState, state ) );
- }
- }
- private void broadcast( StateEvent stateEvent ) {
- for( StateListener listener : listeners ) {
- listener.stateChanged( stateEvent );
- }
- }
- }
既然 CopyOnWriteArraySet 和 AtomicInteger 已经是线程安全的了,我们不再需要上面提到的那样一个“保护块”。但是等一下!我们刚刚不是在学到应该用一个快照来广播事件,来替代用一个隐形的迭代器在原集合(Set)里面做循环嘛?
这或许有些绕脑子,但是由 CopyOnWriteArraySet 提供的 Iterator(迭代器)里面已经有了一个“快照“。CopyOnWriteXXX 这样的集合就是被特别设计在这种情况下大显身手的——它在小长度的场景下会很高效,而针对频繁迭代和只有少量内容修改的场景也做了优化。这就意味着我们的代码是安全的。
随着 Java 8 的发布,broadcast 方法可以因为Iterable#forEach 和 lambdas表达式的结合使用而变得更加简洁,代码当然也是同样安全,因为迭代依然表现为在“快照”中进行:
- private void broadcast( StateEvent stateEvent ) {
- listeners.forEach( listener -> listener.stateChanged( stateEvent ) );
- }

用户点评