etcd 入门实战(3)-java 操作 etcd,1、引入依赖<dep
分享于 点击 49855 次 点评:156
etcd 入门实战(3)-java 操作 etcd,1、引入依赖本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。
1、引入依赖
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.7.7</version>
</dependency>
2、jetcd 使用
2.1、初始化客户端
@Before
public void before() {
client = Client.builder()
.endpoints("http://10.49.196.33:2379")
//.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
.connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
}
2.2、键值操作
A、新增/修改
@Test
public void kvPut() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
log.info("completableFuture={}", completableFuture.get());
}
B、查询
@Test
public void kvGet() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
CompletableFuture<GetResponse> completableFuture = kv.get(key);
GetResponse getResponse = completableFuture.get();
if (getResponse.getCount() > 0) {
log.info("value={}", getResponse.getKvs().get(0).getValue());
}
key = ByteSequence.from("key", StandardCharsets.UTF_8);
GetOption getOption = GetOption.builder().isPrefix(true).build();
completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
for (KeyValue keyValue : completableFuture.get().getKvs()) {
log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
}
}
C、删除
@Test
public void kvDelete() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
log.info("completableFuture={}", completableFuture.get());
}
2.3、监控
@Test
public void watch() throws Exception {
Watch watch = client.getWatchClient();
watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
@Override
public void onNext(WatchResponse response) {
List<WatchEvent> events = response.getEvents();
for (WatchEvent watchEvent : events) {
log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
}
}
@Override
public void onError(Throwable throwable) {
log.error("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("complete");
}
});
Thread.sleep(1000 * 60 * 5);
}
2.4、租约
@Test
public void lease() throws Exception {
Lease lease = client.getLeaseClient();
//创建租约
LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
long leaseId = leaseGrantResponse.getID();
//租约与键值数据绑定
ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
client.getKVClient().put(key, value, putOption).get();
Thread.sleep(1000);
//查看租约剩余时间
LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
//使租约一直有效
lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
}
@Override
public void onError(Throwable throwable) {
log.info("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("Complete");
}
});
Thread.sleep(1000 * 30);
//撤销租约
LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
log.info("leaseRevokeResponse={}", leaseRevokeResponse);
}
2.5、锁
@Test
public void lock() throws Exception {
ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
try {
LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
long leaseId = leaseGrantResponse.getID();
Lock lock = client.getLockClient();
//阻塞获取锁
LockResponse lockResponse = lock.lock(lockName, leaseId).get();
ByteSequence lockKey = lockResponse.getKey();
log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
Thread.sleep(3000);
//释放锁,租约撤销或到期也会释放锁
lock.unlock(lockKey).get();
} catch (Exception e) {
log.error("", e);
}
}).start();
}
Thread.sleep(1000 * 20);
}
2.6、选举
@Test
public void election() throws Exception {
ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
for (ByteSequence proposal : proposals) {
new Thread(() -> {
try {
Election election = client.getElectionClient();
//监听选举事件(可选)
election.observe(electionName, new Election.Listener() {
@Override
public void onNext(LeaderResponse leaderResponse) {
log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
}
@Override
public void onError(Throwable throwable) {
log.error("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("complete");
}
});
LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
long leaseId = leaseGrantResponse.getID();
client.getLeaseClient().keepAlive(leaseId, null);
//获得领导权限或租约到期退出等待
CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
LeaderKey leaderKey = campaignResponse.getLeader();
log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
//获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
LeaderResponse leaderResponse = election.leader(electionName).get();
log.info("领导者:{}", leaderResponse.getKv().getValue());
//TODO:业务处理
Thread.sleep(1000 * 6);
//释放领导权
election.resign(leaderKey).get();
client.getLeaseClient().revoke(leaseId);
} catch (Exception e) {
log.error("", e);
}
}).start();
}
Thread.sleep(1000 * 30);
}
2.7、完整代码
package com.abc.etcd;
import io.etcd.jetcd.*;
import io.etcd.jetcd.election.CampaignResponse;
import io.etcd.jetcd.election.LeaderKey;
import io.etcd.jetcd.election.LeaderResponse;
import io.etcd.jetcd.kv.DeleteResponse;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lease.LeaseRevokeResponse;
import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.LeaseOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class EtcdCase {
private Client client;
@Before
public void before() {
client = Client.builder()
.endpoints("http://10.49.196.33:2379")
//.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
.connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
}
@After
public void after() {
client.close();
}
@Test
public void kvPut() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
log.info("completableFuture={}", completableFuture.get());
}
@Test
public void kvGet() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
CompletableFuture<GetResponse> completableFuture = kv.get(key);
GetResponse getResponse = completableFuture.get();
if (getResponse.getCount() > 0) {
log.info("value={}", getResponse.getKvs().get(0).getValue());
}
key = ByteSequence.from("key", StandardCharsets.UTF_8);
GetOption getOption = GetOption.builder().isPrefix(true).build();
completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
for (KeyValue keyValue : completableFuture.get().getKvs()) {
log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
}
}
@Test
public void kvDelete() throws Exception {
KV kv = client.getKVClient();
ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
log.info("completableFuture={}", completableFuture.get());
}
@Test
public void watch() throws Exception {
Watch watch = client.getWatchClient();
watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
@Override
public void onNext(WatchResponse response) {
List<WatchEvent> events = response.getEvents();
for (WatchEvent watchEvent : events) {
log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
}
}
@Override
public void onError(Throwable throwable) {
log.error("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("complete");
}
});
Thread.sleep(1000 * 60 * 5);
}
@Test
public void lease() throws Exception {
Lease lease = client.getLeaseClient();
//创建租约
LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
long leaseId = leaseGrantResponse.getID();
//租约与键值数据绑定
ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
client.getKVClient().put(key, value, putOption).get();
Thread.sleep(1000);
//查看租约剩余时间
LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
//使租约一直有效
lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
}
@Override
public void onError(Throwable throwable) {
log.info("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("Complete");
}
});
Thread.sleep(1000 * 30);
//撤销租约
LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
log.info("leaseRevokeResponse={}", leaseRevokeResponse);
}
@Test
public void lock() throws Exception {
ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
try {
LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
long leaseId = leaseGrantResponse.getID();
Lock lock = client.getLockClient();
//阻塞获取锁
LockResponse lockResponse = lock.lock(lockName, leaseId).get();
ByteSequence lockKey = lockResponse.getKey();
log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
Thread.sleep(3000);
//释放锁,租约撤销或到期也会释放锁
lock.unlock(lockKey).get();
} catch (Exception e) {
log.error("", e);
}
}).start();
}
Thread.sleep(1000 * 20);
}
@Test
public void election() throws Exception {
ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
for (ByteSequence proposal : proposals) {
new Thread(() -> {
try {
Election election = client.getElectionClient();
//监听选举事件(可选)
election.observe(electionName, new Election.Listener() {
@Override
public void onNext(LeaderResponse leaderResponse) {
log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
}
@Override
public void onError(Throwable throwable) {
log.error("发生异常:{}", throwable.getMessage());
}
@Override
public void onCompleted() {
log.info("complete");
}
});
LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
long leaseId = leaseGrantResponse.getID();
client.getLeaseClient().keepAlive(leaseId, null);
//获得领导权限或租约到期退出等待
CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
LeaderKey leaderKey = campaignResponse.getLeader();
log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
//获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
LeaderResponse leaderResponse = election.leader(electionName).get();
log.info("领导者:{}", leaderResponse.getKv().getValue());
//TODO:业务处理
Thread.sleep(1000 * 6);
//释放领导权
election.resign(leaderKey).get();
client.getLeaseClient().revoke(leaseId);
} catch (Exception e) {
log.error("", e);
}
}).start();
}
Thread.sleep(1000 * 30);
}
}
EtcdCase.java
参考:
https://github.com/etcd-io/jetcd。
本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。
1、引入依赖
<dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.7.7</version> </dependency>
2、jetcd 使用
2.1、初始化客户端
@Before public void before() { client = Client.builder() .endpoints("http://10.49.196.33:2379") //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379") .connectTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); }
2.2、键值操作
A、新增/修改
@Test public void kvPut() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8); CompletableFuture<PutResponse> completableFuture = kv.put(key, value); log.info("completableFuture={}", completableFuture.get()); }
B、查询
@Test public void kvGet() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<GetResponse> completableFuture = kv.get(key); GetResponse getResponse = completableFuture.get(); if (getResponse.getCount() > 0) { log.info("value={}", getResponse.getKvs().get(0).getValue()); } key = ByteSequence.from("key", StandardCharsets.UTF_8); GetOption getOption = GetOption.builder().isPrefix(true).build(); completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据 for (KeyValue keyValue : completableFuture.get().getKvs()) { log.info("key={},value={}", keyValue.getKey(), keyValue.getValue()); } }
C、删除
@Test public void kvDelete() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<DeleteResponse> completableFuture = kv.delete(key); log.info("completableFuture={}", completableFuture.get()); }
2.3、监控
@Test public void watch() throws Exception { Watch watch = client.getWatchClient(); watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() { @Override public void onNext(WatchResponse response) { List<WatchEvent> events = response.getEvents(); for (WatchEvent watchEvent : events) { log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue()); } } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); Thread.sleep(1000 * 60 * 5); }
2.4、租约
@Test public void lease() throws Exception { Lease lease = client.getLeaseClient(); //创建租约 LeaseGrantResponse leaseGrantResponse = lease.grant(10).get(); long leaseId = leaseGrantResponse.getID(); //租约与键值数据绑定 ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8); PutOption putOption = PutOption.builder().withLeaseId(leaseId).build(); client.getKVClient().put(key, value, putOption).get(); Thread.sleep(1000); //查看租约剩余时间 LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build(); LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get(); log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse); //使租约一直有效 lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() { @Override public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL()); } @Override public void onError(Throwable throwable) { log.info("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("Complete"); } }); Thread.sleep(1000 * 30); //撤销租约 LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get(); log.info("leaseRevokeResponse={}", leaseRevokeResponse); }
2.5、锁
@Test public void lock() throws Exception { ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8); for (int i = 1; i <= 3; i++) { new Thread(() -> { try { LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get(); long leaseId = leaseGrantResponse.getID(); Lock lock = client.getLockClient(); //阻塞获取锁 LockResponse lockResponse = lock.lock(lockName, leaseId).get(); ByteSequence lockKey = lockResponse.getKey(); log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey()); Thread.sleep(3000); //释放锁,租约撤销或到期也会释放锁 lock.unlock(lockKey).get(); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 20); }
2.6、选举
@Test public void election() throws Exception { ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8); ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8); ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8); ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8); ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3}; for (ByteSequence proposal : proposals) { new Thread(() -> { try { Election election = client.getElectionClient(); //监听选举事件(可选) election.observe(electionName, new Election.Listener() { @Override public void onNext(LeaderResponse leaderResponse) { log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue()); } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get(); long leaseId = leaseGrantResponse.getID(); client.getLeaseClient().keepAlive(leaseId, null); //获得领导权限或租约到期退出等待 CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get(); LeaderKey leaderKey = campaignResponse.getLeader(); log.info("{},获得领导权,{}", proposal, leaderKey.getKey()); //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException LeaderResponse leaderResponse = election.leader(electionName).get(); log.info("领导者:{}", leaderResponse.getKv().getValue()); //TODO:业务处理 Thread.sleep(1000 * 6); //释放领导权 election.resign(leaderKey).get(); client.getLeaseClient().revoke(leaseId); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 30); }
2.7、完整代码
package com.abc.etcd; import io.etcd.jetcd.*; import io.etcd.jetcd.election.CampaignResponse; import io.etcd.jetcd.election.LeaderKey; import io.etcd.jetcd.election.LeaderResponse; import io.etcd.jetcd.kv.DeleteResponse; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.kv.PutResponse; import io.etcd.jetcd.lease.LeaseGrantResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.lease.LeaseRevokeResponse; import io.etcd.jetcd.lease.LeaseTimeToLiveResponse; import io.etcd.jetcd.lock.LockResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.LeaseOption; import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.watch.WatchEvent; import io.etcd.jetcd.watch.WatchResponse; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.CompletableFuture; @Slf4j public class EtcdCase { private Client client; @Before public void before() { client = Client.builder() .endpoints("http://10.49.196.33:2379") //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379") .connectTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); } @After public void after() { client.close(); } @Test public void kvPut() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8); CompletableFuture<PutResponse> completableFuture = kv.put(key, value); log.info("completableFuture={}", completableFuture.get()); } @Test public void kvGet() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<GetResponse> completableFuture = kv.get(key); GetResponse getResponse = completableFuture.get(); if (getResponse.getCount() > 0) { log.info("value={}", getResponse.getKvs().get(0).getValue()); } key = ByteSequence.from("key", StandardCharsets.UTF_8); GetOption getOption = GetOption.builder().isPrefix(true).build(); completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据 for (KeyValue keyValue : completableFuture.get().getKvs()) { log.info("key={},value={}", keyValue.getKey(), keyValue.getValue()); } } @Test public void kvDelete() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<DeleteResponse> completableFuture = kv.delete(key); log.info("completableFuture={}", completableFuture.get()); } @Test public void watch() throws Exception { Watch watch = client.getWatchClient(); watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() { @Override public void onNext(WatchResponse response) { List<WatchEvent> events = response.getEvents(); for (WatchEvent watchEvent : events) { log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue()); } } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); Thread.sleep(1000 * 60 * 5); } @Test public void lease() throws Exception { Lease lease = client.getLeaseClient(); //创建租约 LeaseGrantResponse leaseGrantResponse = lease.grant(10).get(); long leaseId = leaseGrantResponse.getID(); //租约与键值数据绑定 ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8); PutOption putOption = PutOption.builder().withLeaseId(leaseId).build(); client.getKVClient().put(key, value, putOption).get(); Thread.sleep(1000); //查看租约剩余时间 LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build(); LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get(); log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse); //使租约一直有效 lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() { @Override public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL()); } @Override public void onError(Throwable throwable) { log.info("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("Complete"); } }); Thread.sleep(1000 * 30); //撤销租约 LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get(); log.info("leaseRevokeResponse={}", leaseRevokeResponse); } @Test public void lock() throws Exception { ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8); for (int i = 1; i <= 3; i++) { new Thread(() -> { try { LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get(); long leaseId = leaseGrantResponse.getID(); Lock lock = client.getLockClient(); //阻塞获取锁 LockResponse lockResponse = lock.lock(lockName, leaseId).get(); ByteSequence lockKey = lockResponse.getKey(); log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey()); Thread.sleep(3000); //释放锁,租约撤销或到期也会释放锁 lock.unlock(lockKey).get(); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 20); } @Test public void election() throws Exception { ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8); ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8); ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8); ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8); ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3}; for (ByteSequence proposal : proposals) { new Thread(() -> { try { Election election = client.getElectionClient(); //监听选举事件(可选) election.observe(electionName, new Election.Listener() { @Override public void onNext(LeaderResponse leaderResponse) { log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue()); } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get(); long leaseId = leaseGrantResponse.getID(); client.getLeaseClient().keepAlive(leaseId, null); //获得领导权限或租约到期退出等待 CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get(); LeaderKey leaderKey = campaignResponse.getLeader(); log.info("{},获得领导权,{}", proposal, leaderKey.getKey()); //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException LeaderResponse leaderResponse = election.leader(electionName).get(); log.info("领导者:{}", leaderResponse.getKv().getValue()); //TODO:业务处理 Thread.sleep(1000 * 6); //释放领导权 election.resign(leaderKey).get(); client.getLeaseClient().revoke(leaseId); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 30); } }EtcdCase.java
参考:
https://github.com/etcd-io/jetcd。
用户点评