Zookeeper入门实战(5)-分布式锁,1、引入依赖<dep
分享于 点击 33892 次 点评:184
Zookeeper入门实战(5)-分布式锁,1、引入依赖在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。
1、引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
2、使用样例
2.1、可重入锁
@Test
public void interProcessMutex() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
cf.start();
InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
lock.acquire();
//同一线程中可重复获取
lock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//获取了几次就要释放几次
release(lock);
release(lock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
2.2、不可重入锁
@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
cf.start();
InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
lock.acquire();
//同一线程中不可重复获取
//lock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(lock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
2.3、读写锁(可重入)
@Test
public void interProcessReadWriteLock() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
readLock.acquire();
readLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了读锁");
Thread.sleep(1000 * 2);
} catch (Exception e) {
e.printStackTrace();
} finally {
//获取了几次就要释放几次
release(readLock);
release(readLock);
logger.info(Thread.currentThread().getName() + "释放了读锁");
}
try {
writeLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了写锁");
Thread.sleep(1000 * 2);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(writeLock);
logger.info(Thread.currentThread().getName() + "释放了写锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
2.4、信号量
信号量用于控制对资源同时访问的进程或线程数。
@Test
public void interProcessSemaphoreV2() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
Lease lease = null;
try {
//获取一个许可
lease = semaphore.acquire();
logger.info(Thread.currentThread().getName() + "获得了许可");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放一个许可
semaphore.returnLease(lease);
logger.info(Thread.currentThread().getName() + "释放了许可");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
2.5、多个锁作为单个实体管理
InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。
@Test
public void InterProcessMultiLock() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
//相当于 lock.acquire() 和 lock2.acquire()
multiLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(multiLock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
分布式锁使用样例的完整代码如下:
package com.inspur.demo.general.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
public class CuratorLockCase {
private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class);
private static String connectString = "10.49.196.33:2181";
private static int sessionTimeout = 40 * 1000;
private static int connectionTimeout = 60 * 1000;
/**
* 可重入锁
*/
@Test
public void interProcessMutex() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
cf.start();
InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
lock.acquire();
//同一线程中可重复获取
lock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//获取了几次就要释放几次
release(lock);
release(lock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
/**
* 不可重入锁
*/
@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
cf.start();
InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
lock.acquire();
//同一线程中不可重复获取
//lock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(lock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
/**
* 读写锁(可重入)
*/
@Test
public void interProcessReadWriteLock() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
readLock.acquire();
readLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了读锁");
Thread.sleep(1000 * 2);
} catch (Exception e) {
e.printStackTrace();
} finally {
//获取了几次就要释放几次
release(readLock);
release(readLock);
logger.info(Thread.currentThread().getName() + "释放了读锁");
}
try {
writeLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了写锁");
Thread.sleep(1000 * 2);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(writeLock);
logger.info(Thread.currentThread().getName() + "释放了写锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
/**
* 信号量,用于控制对资源同时访问的进程或线程数
*/
@Test
public void interProcessSemaphoreV2() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
Lease lease = null;
try {
//获取一个许可
lease = semaphore.acquire();
logger.info(Thread.currentThread().getName() + "获得了许可");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放一个许可
semaphore.returnLease(lease);
logger.info(Thread.currentThread().getName() + "释放了许可");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
/**
* 多个锁作为单个实体管理
*/
@Test
public void InterProcessMultiLock() throws InterruptedException {
CuratorFramework cf = getCuratorFramework();
InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));
cf.start();
int size = 5;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
//相当于 lock.acquire() 和 lock2.acquire()
multiLock.acquire();
logger.info(Thread.currentThread().getName() + "获得了锁");
Thread.sleep(1000 * 3);
} catch (Exception e) {
e.printStackTrace();
} finally {
release(multiLock);
logger.info(Thread.currentThread().getName() + "释放了锁");
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
cf.close();
}
private CuratorFramework getCuratorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build();
return cf;
}
private void release(InterProcessLock lock) {
if (lock != null) {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
CuratorLockCase.java
在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。
1、引入依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency>
2、使用样例
2.1、可重入锁
@Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中可重复获取 lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.2、不可重入锁
@Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中不可重复获取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.3、读写锁(可重入)
@Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了读锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "释放了读锁"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了写锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "释放了写锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.4、信号量
信号量用于控制对资源同时访问的进程或线程数。
@Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //获取一个许可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "获得了许可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //释放一个许可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "释放了许可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.5、多个锁作为单个实体管理
InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。
@Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相当于 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
分布式锁使用样例的完整代码如下:
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.concurrent.CountDownLatch; public class CuratorLockCase { private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class); private static String connectString = "10.49.196.33:2181"; private static int sessionTimeout = 40 * 1000; private static int connectionTimeout = 60 * 1000; /** * 可重入锁 */ @Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中可重复获取 lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 不可重入锁 */ @Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中不可重复获取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 读写锁(可重入) */ @Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了读锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "释放了读锁"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了写锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "释放了写锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 信号量,用于控制对资源同时访问的进程或线程数 */ @Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //获取一个许可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "获得了许可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //释放一个许可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "释放了许可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 多个锁作为单个实体管理 */ @Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相当于 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } private CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); return cf; } private void release(InterProcessLock lock) { if (lock != null) { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }CuratorLockCase.java
用户点评