欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

etcd 入门实战(3)-java 操作 etcd,1、引入依赖<dep

来源: javaer 分享于  点击 49855 次 点评:156

etcd 入门实战(3)-java 操作 etcd,1、引入依赖

本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。

1、引入依赖

<dependency>
    <groupId>io.etcd</groupId>
    <artifactId>jetcd-core</artifactId>
    <version>0.7.7</version>
</dependency>

2、jetcd 使用

2.1、初始化客户端

@Before
public void before() {
    client = Client.builder()
            .endpoints("http://10.49.196.33:2379")
            //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
            .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
}

2.2、键值操作

A、新增/修改

@Test
public void kvPut() throws Exception {
    KV kv = client.getKVClient();
    ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
    ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
    CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
    log.info("completableFuture={}", completableFuture.get());
}

B、查询

@Test
public void kvGet() throws Exception {
    KV kv = client.getKVClient();
    ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
    CompletableFuture<GetResponse> completableFuture = kv.get(key);
    GetResponse getResponse = completableFuture.get();
    if (getResponse.getCount() > 0) {
        log.info("value={}", getResponse.getKvs().get(0).getValue());
    }

    key = ByteSequence.from("key", StandardCharsets.UTF_8);
    GetOption getOption = GetOption.builder().isPrefix(true).build();
    completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
    for (KeyValue keyValue : completableFuture.get().getKvs()) {
        log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
    }
}

C、删除

@Test
public void kvDelete() throws Exception {
    KV kv = client.getKVClient();
    ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
    CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
    log.info("completableFuture={}", completableFuture.get());
}

2.3、监控

@Test
public void watch() throws Exception {
    Watch watch = client.getWatchClient();
    watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
        @Override
        public void onNext(WatchResponse response) {
            List<WatchEvent> events = response.getEvents();
            for (WatchEvent watchEvent : events) {
                log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
            }
        }

        @Override
        public void onError(Throwable throwable) {
            log.error("发生异常:{}", throwable.getMessage());
        }

        @Override
        public void onCompleted() {
            log.info("complete");
        }
    });

    Thread.sleep(1000 * 60 * 5);
}

2.4、租约

@Test
public void lease() throws Exception {
    Lease lease = client.getLeaseClient();

    //创建租约
    LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
    long leaseId = leaseGrantResponse.getID();

    //租约与键值数据绑定
    ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
    ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
    PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
    client.getKVClient().put(key, value, putOption).get();

    Thread.sleep(1000);

    //查看租约剩余时间
    LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
    LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
    log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);

    //使租约一直有效
    lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
        @Override
        public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
            log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("发生异常:{}", throwable.getMessage());
        }

        @Override
        public void onCompleted() {
            log.info("Complete");
        }
    });

    Thread.sleep(1000 * 30);

    //撤销租约
    LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
    log.info("leaseRevokeResponse={}", leaseRevokeResponse);
}

2.5、锁

@Test
public void lock() throws Exception {
    ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
    for (int i = 1; i <= 3; i++) {
        new Thread(() -> {
            try {
                LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
                long leaseId = leaseGrantResponse.getID();

                Lock lock = client.getLockClient();
                //阻塞获取锁
                LockResponse lockResponse = lock.lock(lockName, leaseId).get();
                ByteSequence lockKey = lockResponse.getKey();
                log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
                Thread.sleep(3000);
                //释放锁,租约撤销或到期也会释放锁
                lock.unlock(lockKey).get();
            } catch (Exception e) {
                log.error("", e);
            }
        }).start();
    }

    Thread.sleep(1000 * 20);
}

2.6、选举

@Test
public void election() throws Exception {
    ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
    ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
    ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
    ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
    ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};

    for (ByteSequence proposal : proposals) {
        new Thread(() -> {
            try {
                Election election = client.getElectionClient();
                //监听选举事件(可选)
                election.observe(electionName, new Election.Listener() {
                    @Override
                    public void onNext(LeaderResponse leaderResponse) {
                        log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.error("发生异常:{}", throwable.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        log.info("complete");
                    }
                });

                LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
                long leaseId = leaseGrantResponse.getID();
                client.getLeaseClient().keepAlive(leaseId, null);

                //获得领导权限或租约到期退出等待
                CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
                LeaderKey leaderKey = campaignResponse.getLeader();
                log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
                //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
                LeaderResponse leaderResponse = election.leader(electionName).get();
                log.info("领导者:{}", leaderResponse.getKv().getValue());
                //TODO:业务处理
                Thread.sleep(1000 * 6);
                //释放领导权
                election.resign(leaderKey).get();
                client.getLeaseClient().revoke(leaseId);
            } catch (Exception e) {
                log.error("", e);
            }
        }).start();
    }

    Thread.sleep(1000 * 30);
}

2.7、完整代码

package com.abc.etcd;

import io.etcd.jetcd.*;
import io.etcd.jetcd.election.CampaignResponse;
import io.etcd.jetcd.election.LeaderKey;
import io.etcd.jetcd.election.LeaderResponse;
import io.etcd.jetcd.kv.DeleteResponse;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lease.LeaseRevokeResponse;
import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.LeaseOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class EtcdCase {
    private Client client;

    @Before
    public void before() {
        client = Client.builder()
                .endpoints("http://10.49.196.33:2379")
                //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
                .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void kvPut() throws Exception {
        KV kv = client.getKVClient();
        ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
        CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
        log.info("completableFuture={}", completableFuture.get());
    }

    @Test
    public void kvGet() throws Exception {
        KV kv = client.getKVClient();
        ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
        CompletableFuture<GetResponse> completableFuture = kv.get(key);
        GetResponse getResponse = completableFuture.get();
        if (getResponse.getCount() > 0) {
            log.info("value={}", getResponse.getKvs().get(0).getValue());
        }

        key = ByteSequence.from("key", StandardCharsets.UTF_8);
        GetOption getOption = GetOption.builder().isPrefix(true).build();
        completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
        for (KeyValue keyValue : completableFuture.get().getKvs()) {
            log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
        }
    }

    @Test
    public void kvDelete() throws Exception {
        KV kv = client.getKVClient();
        ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
        CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
        log.info("completableFuture={}", completableFuture.get());
    }

    @Test
    public void watch() throws Exception {
        Watch watch = client.getWatchClient();
        watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
            @Override
            public void onNext(WatchResponse response) {
                List<WatchEvent> events = response.getEvents();
                for (WatchEvent watchEvent : events) {
                    log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
                }
            }

            @Override
            public void onError(Throwable throwable) {
                log.error("发生异常:{}", throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                log.info("complete");
            }
        });

        Thread.sleep(1000 * 60 * 5);
    }

    @Test
    public void lease() throws Exception {
        Lease lease = client.getLeaseClient();

        //创建租约
        LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
        long leaseId = leaseGrantResponse.getID();

        //租约与键值数据绑定
        ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
        PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
        client.getKVClient().put(key, value, putOption).get();

        Thread.sleep(1000);

        //查看租约剩余时间
        LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
        LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
        log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);

        //使租约一直有效
        lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
                log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
            }

            @Override
            public void onError(Throwable throwable) {
                log.info("发生异常:{}", throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                log.info("Complete");
            }
        });

        Thread.sleep(1000 * 30);

        //撤销租约
        LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
        log.info("leaseRevokeResponse={}", leaseRevokeResponse);
    }

    @Test
    public void lock() throws Exception {
        ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
        for (int i = 1; i <= 3; i++) {
            new Thread(() -> {
                try {
                    LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
                    long leaseId = leaseGrantResponse.getID();

                    Lock lock = client.getLockClient();
                    //阻塞获取锁
                    LockResponse lockResponse = lock.lock(lockName, leaseId).get();
                    ByteSequence lockKey = lockResponse.getKey();
                    log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
                    Thread.sleep(3000);
                    //释放锁,租约撤销或到期也会释放锁
                    lock.unlock(lockKey).get();
                } catch (Exception e) {
                    log.error("", e);
                }
            }).start();
        }

        Thread.sleep(1000 * 20);
    }

    @Test
    public void election() throws Exception {
        ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
        ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
        ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
        ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
        ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};

        for (ByteSequence proposal : proposals) {
            new Thread(() -> {
                try {
                    Election election = client.getElectionClient();
                    //监听选举事件(可选)
                    election.observe(electionName, new Election.Listener() {
                        @Override
                        public void onNext(LeaderResponse leaderResponse) {
                            log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            log.error("发生异常:{}", throwable.getMessage());
                        }

                        @Override
                        public void onCompleted() {
                            log.info("complete");
                        }
                    });

                    LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
                    long leaseId = leaseGrantResponse.getID();
                    client.getLeaseClient().keepAlive(leaseId, null);

                    //获得领导权限或租约到期退出等待
                    CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
                    LeaderKey leaderKey = campaignResponse.getLeader();
                    log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
                    //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
                    LeaderResponse leaderResponse = election.leader(electionName).get();
                    log.info("领导者:{}", leaderResponse.getKv().getValue());
                    //TODO:业务处理
                    Thread.sleep(1000 * 6);
                    //释放领导权
                    election.resign(leaderKey).get();
                    client.getLeaseClient().revoke(leaseId);
                } catch (Exception e) {
                    log.error("", e);
                }
            }).start();
        }

        Thread.sleep(1000 * 30);
    }
    
}
EtcdCase.java

 

参考:
https://github.com/etcd-io/jetcd。

 

相关栏目:

用户点评