zookeeper学习笔记04-开源客户端(ZkClient + Curator)_基于backoff的重连策略-程序员宅基地

技术标签: zkclient  curator  # zookeeper  zookeeper  


五 、开源客户端(ZkClient + Curator)

1.ZkClient
ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。

想要使用ZkClient必须相应的jar包,由于我的工程使用maven构建的,故在此首先要引入maven依赖:

 <dependency>
      <groupId>com.github.sgroschupf</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.1</version>
    </dependency>

a.连接

ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer);
//
zkServers:    连接字符串
sessionTimeOut:    会话超时时间
connectionTimeout:    连接超时时间
ZkSerializer:    自定义序列化器,使得zookeeper将设置节点内容为java对象。(原生仅支持byte[])

b.创建节点
ZkClient提供了较多的创建节点的API接口,且很多都跟原生API类似,但是有一个参数createParent比较有趣

void createPersistent(String path, boolean createParents);
//createParents: 是否递归创建父节点

c.删除节点
zookeeper原生API只允许删除叶子节点, ZkClient提供了一个新方法:deleteRecursive(),这个方法帮助我们逐层遍历删除叶子节点。

boolean deleteRecursive(String path)

d.读取数据(getChildren)
ZkClient只对外提供一个getChildren的API,而且该API没有了Watcher的功能。ZkClient提供了subscribeChildChanges方法来监听子节点的变更事件。

List<String> subscribeChildChanges(String path, IZkChildListener listener) ;

IZkChildListener主要可以监听到如下一下四种事件:

* path节点的创建(即可以监听一个不存在的节点,待该节点创建时,触发此事件)
* path节点的删除
* path新增子节点
* path删除子节点

需实现IZkChildListener并重写handleChildChange方法,注意此方法跟原生的Watcher不同,此方法是注册一次一直生效。

void handleChildChange(String parentPath, List<String> currentChilds);

e.读取数据(readData)

T readData(String path);
T readData(String path, boolean returnNullIfPathNotExists); //如果节点不存在返回null
T readData(String path, Stat stat);

ZkClient通过subscribeDataChanges来监听节点内容的变化:

subscribeDataChanges(String path, IZkDataListener listener)

需实现IZkDataListener 并重写handleDataChange和handleDataDelete方法
handleDataChange监听到:

* 该节点初次创建(即可以创建一个不存在的节点)
* 节点数据的变化

handleDataDelete可以监听到:节点的删除。

f.更新数据(writeData)

void writeData(final String path, Object datat, final int expectedVersion)

g.检测节点是否存在(exists)

boolean exists(final String path)

2.Curator
Curator是Netflix开源的一套ZooKeeper客户端框架.它解决了很多ZooKeeper客户端非常底层的操作如:连接重连,反复注册和NodeExistsExceptiion异常等,它目前已经成为了Apache的顶级项目。使用它,需要引入它的maven依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

a.创建会话
通过CuratorFrameworkFactory.newInstance(),创建CuratorFramework,最终调用CuratorFrameworkFactory.start()方法创建连接。

CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

其中,RetryPolicy为重试策略主要包含以下四种重试策略:

* ExponentialBackoffRetry :基于backoff的重连策略 
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)  

baseSleepTimeMs:初始sleep时间,
maxRetries: 最大重试次数
maxSleepMs:最大sleep时间

当前sleep时间 = baseSleepTimeMs * Math.max(1,random.nextInt( 1 << (retryCount + 1)));
所以sleep时间会越来越长,当这个时间大于maxSleepMs时,则取maxSleepMs
* RetryNTimes(int n, int sleepMsBetweenRetries): 重连N次策略
* RetryOneTime(int sleepMsBetweenRetry):重连一次
* RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries):以sleepMsBetweenRetries的间隔重连,直到超过maxElapsedTimeMs的时间设置
* RetryForever:永远重试策略

通过两种方式创建会话:

//1.工厂类创建
CuratorFramework client = CuratorFrameworkFactory.newClient(IZooKeeperConfig.CONNECT_STRING,
        5000,
        5000,
        retryPolicy);
client.start();

//2.Fluent风格
client = CuratorFrameworkFactory.builder()
    .connectString(IZooKeeperConfig.CONNECT_STRING)
    .sessionTimeoutMs(5000)
    .connectionTimeoutMs(5000)
    .retryPolicy(retryPolicy)
    .namespace("base") //创建base节点
.build();
client.start();

同时支持namespace属性来对各个不同的业务进行隔离,若使用namespace属性,则再在该客户端上的操作节点都是基于namespace为相对目录进行的。

b.创建节点
使用create().forPath(path)创建一个空节点,额外属性的使用creatingParentsIfNeeded递归创建父节点,withMode指定模式;

client.create() 
    .creatingParentsIfNeeded() 
    .withMode(CreateMode.PERSISTENT)
    .forPath("/book/ch5","haha".getBytes());

c.删除节点
使用delete().forPath(path)删除一个节点,额外属性的使用deletingChildrenIfNeeded递归删除子节点。

client.delete()
    .deletingChildrenIfNeeded()
    .withVersion(-1)
    .forPath("/book/ch5");

d.读取数据

 client.getData()
        .storingStatIn(new Stat() )
        .forPath("/book/ch5");

e.设置数据

client.setData()
    .withVersion(stat.getVersion())
    .forPath("/book/ch5", "adadfafa".getBytes());

f.异步接口
之前所介绍的api都是同步接口,Curator引入了BackgroundCallback接口,来处理异步调用服务端返回的的结果信息。BackgroundCallback只有一个processResult方法:

void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent);

CuratorEvent的事件类型主要有GET_DATA,SET_DATA,CHILDREN,SYNC,GET_ACL,WATCHED,CLOSING类型。

响应码:

   0:    ok
  -4:    connectionloss
-110:    node not exists
-112:    sessionexpired

Curator提供了如下API来支持异步操作:

public T inBackground(BackgroundCallback callback);
public T inBackground(BackgroundCallback callback, Object context);
public T inBackground(BackgroundCallback callback, Executor executor);
public T inBackground(BackgroundCallback callback, Object context, Executor executor);

重点关注下executor,zookeeper的所有异步操作都是由EventThread这个线程来处理的,当碰上一个复杂的处理单元,就会消耗过长的处理时间, 就会影响到后面的时间的处理。所以在此处允许用户传递一个Executor实例,可以把一些复杂的处理时间专门放到一个线程池中。

client.setData().withVersion(stat.getVersion())
    .inBackground(new BackgroundCallback(){
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
        }
    }, Executors.newFixedThreadPool(2)) //使用异步方式获取数据
    .forPath("/book/ch5", "adadfafa".getBytes());

g.NodeCache和NodeCacheListener
1).使用样例

NodeCache cache = new NodeCache(client, "/chap05", false);
cache.start(true);
cache.getListenable().addListener(()->{
    System.out.println("变化了::"+cache.getCurrentData().getData());
});

2).NodeCache
想要使用必须先调用start方法,不用之后调用close方法.

NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

void start(boolean buildInitial); //若为true ,nodecache第一次启动时就从zookeeper上读取对应节点数据,并保存到cache中;

3).NodeCacheListener

cache.getListenable().addListener(XXNodeCacheListener);//使用此种方式,注册监听

需要实现NodeCacheListener 并重写nodeChanged()方法,

cache.getCurrentData().getData();//获取变更后的数据

可以监听到事件:

* 节点创建(即可以监听一个不存在的节点)
* 节点数据变化    

h.PathChildrenCache和PathChildrenCacheListener
1).使用样例

PathChildrenCache childrenCache = new PathChildrenCache(client, "/chap05", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener((CuratorFramework cc, PathChildrenCacheEvent event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("新增子节点:"+event.getData().getPath());
                    break;

                case CHILD_REMOVED:
                    System.out.println("删除子节点:"+event.getData().getPath());
                    break;

                case CHILD_UPDATED:
                    System.out.println("更新子节点:"+event.getData().getPath());
                    break;

                default:
                    System.out.println(event.getType().toString()+":::::"+event.getData().getPath());
            }
        });

2).PathChildrenCache
想要使用必须先调用start方法,不用之后调用close方法.
start有两个, 其中一个可以传入StartMode,用来为初始的cache设置暖场方式(warm):

PathChildrenCache(CuratorFramework client, String path, boolean cacheData);

void start(StartMode mode);

NORMAL: 初始时为空。
BUILD_INITIAL_CACHE: 在这个方法返回之前调用rebuild()。
POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个

3).PathChildrenCacheListener

cache.getListenable().addListener(XXPathChildrenCacheListener);//使用此种方式,注册监听

实现PathChildrenCacheListener重写childEvent方法:

void childEvent(CuratorFramework client, PathChildrenCacheEvent event)

可以监听到:

* 子节点新增、删除
* 子节点数据变更(不监听子节点添加or删除孙子节点)

3.Curator的典型应用场景

a.master选举-LeaderSelector

b.分布式锁–InterProcessMutex

void acquire(); //获得锁
void release();//释放锁

c.分布式计数器–DistributedAtomicInteger

d.分布式Barrier —DistributedBarrier

e.ZKPaths 和 EnsurePath

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/it_freshman/article/details/79262217

智能推荐

bzoj3298-程序员宅基地

文章浏览阅读151次。分析:。。美国人民脑洞大。。。注意格式。。具体题解%http://www.bubuko.com/infodetail-740656.html#include#include#include#include#define fo(i,a,b) for(int i=a;i<=b;i++)using namespace s

L Machining Disc Rotors_l. machining disc rotors-程序员宅基地

文章浏览阅读242次。L Machining Disc Rotors题意:圆心为(0,0)半径为R的圆,现在被被n个互不相交的圆切割(圆心和半径会给出),保证这n个彼此之间不会交叉,保证n个圆中不会有某个包含整个大圆的情况。问切割后大圆剩余部分的直径(即两点的最远距离)题解:圆上的最远距离就是半径,如果存在一个点没被切割,然后求其关于圆心作对称点,如果对称点也存在就说明构成了一条没有被切掉的直径。如果直径不存在,答案就是两个交点之间的最大距离(即图中情况)这个题最难的是写代码。。。头大,计算几何一看就头大代码:_l. machining disc rotors

python 插入clickhouse数据报错_cannot parse json string: expected opening quote:-程序员宅基地

文章浏览阅读2.2k次。python 插入clickhouse报错 clickhouse : Cannot parse JSON string: expected opening quote: (while read the value of key data): (at row 1): While executing SourceFromInputStream 最近工作需要使用clickhouse_cannot parse json string: expected opening quote:

开源软件如何赚钱?_b) 公司如何从“免费”的开源软件中赚钱?-程序员宅基地

文章浏览阅读1.4w次,点赞9次,收藏13次。所谓开源就是开放源代码。源代码是软件的本质,所有程序都有源代码,就像人类的语言一样,有词汇和语法。源代码可以说是一个作者的主要命脉了。一般软件作者将软件的源代码开放出来,以保障软件用户自由使用及接触源代码的权利。这同时也保障了用户自行修改、复制以及_b) 公司如何从“免费”的开源软件中赚钱?

使用标准库:文本查询程序_用txt文档制作查询程序-程序员宅基地

文章浏览阅读168次。最近打算学习c++11 中智能指针的使用,写了一个文本查询程序。功能有读入一个“.txt”文件,在其中找单词“london”,具体格式见图://TextQuery.cpp#include "TextQuery.h"#include&lt;iostream&gt;#include&lt;fstream&gt;using namespace std;void runQueries(s..._用txt文档制作查询程序

单行文本省略和多行文本省略_实现单行文本缩略和多行文本省略-程序员宅基地

文章浏览阅读262次。单行文本省略overflow: hidden;text-overflow:ellipsis;white-space: nowrap;多行文本省略display: -webkit-box;-webkit-box-orient: vertical;-webkit-line-clamp: 3;overflow: hidden;_实现单行文本缩略和多行文本省略

随便推点

求某个节点的所有父节点_sqlserver根据子节点获取它对应的所有父节点-程序员宅基地

文章浏览阅读1.1k次。package com.fh.service.xtgl;import com.fh.util.PageData; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.util.ArrayList; import java.util.List;//根据_sqlserver根据子节点获取它对应的所有父节点

C# 自定义控件制作和使用实例_c# vs2017 自定义控件 开发流程-程序员宅基地

文章浏览阅读3.8w次,点赞16次,收藏75次。C# 自定义用户控件xiongxuanwen 上篇:控件制作 本例是制作一个简单的自定义控件,然后用一个简单的测试程序,对于初学者来说,本例子比较简单,只能起到抛石引玉的效果。我也是在学习当中,今后会将自己所学的逐步写出来和大家交流共享。 第一步:新建一个控件库项目:myControl 第二步:从工具箱里面拖动1个PictureBox、1个Button、6_c# vs2017 自定义控件 开发流程

maven五:查找jar包坐标,选择jar包版本_在一个maven工程中,如何快速判断一个jar的版本是在哪里定义的?-程序员宅基地

文章浏览阅读1.5w次,点赞9次,收藏14次。查找jar包坐标以spring core的jar包为例,访问http://www.mvnrepository.com/ 在最上方中间,输入spring core,点击Search。搜索结果第一个就是,点击spring core有很多版本,这里点击4.3.5.RELEASE点击maven栏里面的内容,允许访问,会复制到剪贴板然后直接粘贴到pom.xml文件的_在一个maven工程中,如何快速判断一个jar的版本是在哪里定义的?

linux卸载图形逻辑卷界面,使用lvremove命令在Linux系统中删除LVM卷(逻辑卷)-程序员宅基地

文章浏览阅读948次。如果Linux系统上的LVM不再需要使用LVM卷(逻辑卷),您可以使用lvremove命令按照以下步骤删除它。但是请确保LVM卷不包含任何数据,如果是,请确保在继续删除LVM之前备份该数据。为了说明这一点,我们将从卷组“vg01”中删除“lv001”,LV安装在挂载点/lvmtest上。参考lvremove命令_Linux lvremove命令使用详解:删除指定LVM逻辑卷。实施的方法使用df命令..._lvremove

最适合物联网LOT的开源数据库_lot 数据库有哪里-程序员宅基地

文章浏览阅读2.4k次。最适合物联网的开源数据库https://blog.csdn.net/shnbiot/article/details/80693520_lot 数据库有哪里

H - Fence-程序员宅基地

文章浏览阅读106次。H - FenceThere is a fence in front of Polycarpus’s home. The fence consists of n planks of the same width which go one after another from left to right. The height of the i-th plank is hi meters, distinct planks can have distinct heights.Fence for n = 7

推荐文章

热门文章

相关标签