Curator就是动物园的园长)

[TOC]

原文:https://www.jianshu.com/p/70151fc0ef5d

Zookeeper客户端Curator使用详解

Zookeeper客户端Curator使用详解

简介

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了不少Zookeeper客户端分外底层的细节开发工作,包括连日来重连、反复注册沃特cher和NodeExistsException卓殊等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予低度评价。
引子和趣闻:
Zookeeper名字的案由是相比有趣的,上边的有的摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与履行》一书:
Zookeeper最早起源于雅虎的研商院的一个探究小组。在霎时,探讨人口发现,在雅虎内部很多大型的系统需要借助一个类似的系列开展分布式协调,然则这多少个系统往往存在分布式单点问题。所以雅虎的开发人士就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到广大项目都是用动物的名字来命名的(例如闻名的Pig项目),雅虎的工程师希望给那么些类型也取一个动物的名字。时任研讨院的首席科学家Raghu
Ramakrishnan开玩笑说:再这样下去,我们这儿就改为动物园了。此话一出,我们纷纷表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一起,雅虎的一切分布式系统看上去就像一个特大型的动物园了,而Zookeeper正好用来拓展分布式环境的调和——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”或者”管理者”,不通晓是不是开发小组有意而为之,笔者揣度有可能这样命名的来由是认证Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:
curator-framework:对zookeeper的底部api的部分卷入
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:装进了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的本子:2.12.0,对应Zookeeper的本子为:3.4.x,只要跨版本会有兼容性问题,很有可能引致节点操作战败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

前提

前不久恰巧用到了zookeeper,做了一个基于SpringBoot、Curator、Bootstrap写了一个可视化的Web应用:

zookeeper-console

迎接使用和star。

Curator的基本Api

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,解决了诸多Zookeeper客户端卓殊底层的底细开发工作,包括连续重连、反复注册沃特cher和NodeExistsException非凡等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予低度评价。
引子和趣闻:
Zookeeper名字的案由是相比较好玩的,下边的一些摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与履行》一书:
Zookeeper最早起点于雅虎的商讨院的一个探讨小组。在及时,研讨人口发现,在雅虎内部很多大型的系统需要倚重一个接近的系列开展分布式协调,不过那么些系统往往存在分布式单点问题。所以雅虎的开发人士就准备开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到许多品类都是用动物的名字来定名的(例如有名的Pig项目),雅虎的工程师希望给这多少个项目也取一个动物的名字。时任研商院的上位数学家Raghu
Ramakrishnan开玩笑说:再这么下来,我们这儿就改为动物园了。此话一出,我们纷纷表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一起,雅虎的方方面面分布式系统看上去就像一个特大型的动物园了,而Zookeeper正好用来举办分布式环境的和谐——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”或者”管理者”,不知道是不是付出小组有意而为之,笔者预计有可能这样命名的因由是注解Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了多少个包:
curator-framework:对zookeeper的平底api的片段封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:包裹了部分高等特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式巴里r等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的本子为:3.4.x,假定跨版本会有兼容性问题,很有可能造成节点操作失败):

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

创制会话

Curator的基本Api

1.应用静态工程措施创制客户端

一个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包含多少个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

创办会话

2.采取Fluent风格的Api创制会话

着力参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

1.利用静态工程措施创制客户端

一个例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

 

newClient静态工厂方法包含多少个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

3.创建包含隔离命名空间的对话

为了落实不同的Zookeeper业务之间的隔离,需要为每个事情分配一个独自的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是遵照该目录举办的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在两个使用共用一个Zookeeper集群的场馆下,那对于贯彻不同应用之间的互动隔离非常有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

2.利用Fluent风格的Api创设会话

主干参数变为流式设置,一个列子如下:

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

 

起步客户端

当创设会话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

3.创设包含隔离命名空间的对话

为了落实不同的Zookeeper业务之间的隔离,需要为每个工作分配一个独自的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的例证)当客户端指定了独自命名空间为“/base”,那么该客户端对Zookeeper上的数码节点的操作都是依照该目录举办的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在六个使用共用一个Zookeeper集群的现象下,这对于落实不同应用之间的并行隔离相当有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

 

数码节点操作

启动客户端

当创制会话成功,拿到client的实例然后可以一向调用其start( )方法:

client.start();

开创数量节点

Zookeeper的节点创造格局:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带系列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带系列号

**成立一个节点,起始内容为空 **

client.create().forPath("path");

小心:假设没有设置节点属性,节点创造模式默认为持久化节点,内容默认为空

创立一个节点,附带起首化内容

client.create().forPath("path","init".getBytes());

创建一个节点,指定成立形式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创办一个节点,指定创立情势(临时节点),附带起头化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

始建一个节点,指定创制形式(临时节点),附带开端化内容,并且自动递归创立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口相当有用,因为相似情形开发人士在创造一个子节点必须认清它的父节点是否留存,假若不存在间接创立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创立所有所需的父节点。

数码节点操作

删去数据节点

删除一个节点

client.delete().forPath("path");

留神,此办法只好去除叶子节点,否则会抛出非常。

剔除一个节点,并且递归删除其持有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

除去一个节点,强制指定版本举办删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保障方法,只要客户端会话有效,那么Curator会在后台持续开展删除操作,直到删除节点成功。

注意:上边的五个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

创办数量节点

Zookeeper的节点创立格局:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带系列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带系列号

**创建一个节点,初叶内容为空 **

client.create().forPath("path");

只顾:即便没有设置节点属性,节点创设格局默认为持久化节点,内容默认为空

开创一个节点,附带伊始化内容

client.create().forPath("path","init".getBytes());

创办一个节点,指定创立情势(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

始建一个节点,指定成立格局(临时节点),附带起初化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创立一个节点,指定创立格局(临时节点),附带起初化内容,并且自动递归创设父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口相当有用,因为相似境况开发人士在创建一个子节点必须认清它的父节点是否留存,假使不存在间接创设会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够活动递归成立所有所需的父节点。

读取数据节点数据

读取一个节点的数目内容

client.getData().forPath("path");

注意,此办法返的再次回到值是byte[ ];

读取一个节点的多寡内容,同时获取到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

除去数据节点

删去一个节点

client.delete().forPath("path");

只顾,此形式只能去除叶子节点,否则会抛出特别。

删去一个节点,并且递归删除其抱有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

剔除一个节点,强制指定版本进行删除

client.delete().withVersion(10086).forPath("path");

除去一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个维持情势,只要客户端会话有效,那么Curator会在后台持续拓展删除操作,直到删除节点成功。

注意:下面的五个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

改进数据节点数据

履新一个节点的数量内容

client.setData().forPath("path","data".getBytes());

留神:该接口会重临一个Stat实例

更新一个节点的多少内容,强制指定版本实行翻新

client.setData().withVersion(10086).forPath("path","data".getBytes());

读取数据节点数据

读取一个节点的数目内容

client.getData().forPath("path");

小心,此方法返的重临值是byte[ ];

读取一个节点的多寡内容,同时得到到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

反省节点是否存在

client.checkExists().forPath("path");

小心:该形式再次来到一个Stat实例,用于检查ZNode是否存在的操作.
可以调用额外的不二法门(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode

更新数据节点数据

革新一个节点的数码内容

client.setData().forPath("path","data".getBytes());

小心:该接口会回到一个Stat实例

履新一个节点的数据内容,强制指定版本举行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

拿到某个节点的所有子节点路径

client.getChildren().forPath("path");

小心:该格局的重临值为List<String>,拿到ZNode的子节点Path列表。
可以调用额外的点子(监控、后台处理或者取得状态watch, background or get
stat) 并在终极调用forPath()指定要操作的父ZNode

自我批评节点是否存在

client.checkExists().forPath("path");

留神:该方法重回一个Stat实例,用于检查ZNode是否留存的操作.
能够调用额外的法子(监控或者后台处理)并在终极调用forPath()指定要操作的ZNode

事务

CuratorFramework的实例包含inTransaction()接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

拿到某个节点的所有子节点路径

client.getChildren().forPath("path");

留神:该方法的重回值为List<String>,拿到ZNode的子节点Path列表。
可以调用额外的章程(监控、后台处理依旧取得状态watch, background or get
stat) 并在结尾调用forPath()指定要操作的父ZNode

异步接口

下面提到的创导、删除、更新、读取等办法都是手拉手的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端重临的结果信息。BackgroundCallback接口中一个第一的回调值为Curator伊芙nt,里面含有事件类型、响应吗和节点的详细音讯。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步成立节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的伊芙ntThread去举办异步处理。

事务

CuratorFramework的实例包含inTransaction()接口方法,调用此措施开启一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个例证如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

Curator食谱(高级特性)

提示:首先你必须添加curator-recipes倚重,下文仅仅对recipes一些表征的采纳举办诠释和举例,不打算进行源码级此外追究

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重中之重指示:强烈推荐使用ConnectionStateListener监控连接的情形,当连接意况为LOST,curator-recipes下的富有Api将会失灵或者逾期,即使前边所有的事例都没有采取到ConnectionStateListener。

异步接口

下面提到的开创、删除、更新、读取等模式都是共同的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端重回的结果音讯。BackgroundCallback接口中一个重中之重的回调值为Curator伊芙(Eve)nt,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步成立节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的伊芙ntThread去举行异步处理。

缓存

Zookeeper原生帮忙通过注册沃特(Wat)cher来开展事件监听,不过开发者需要频繁注册(沃特cher只好单次注册单次使用)。Cache是Curator中对事件监听的包装,可以当做是对事件监听的当地缓存视图,可以自动为开发者处理反复注册监听。Curator提供了二种沃特cher(Cache)来监听结点的变化。

Curator食谱(高级特性)

提醒:首先你必须添加curator-recipes倚重,下文仅仅对recipes一些表征的应用举行分解和举例,不打算举办源码级另外探究

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重中之重提醒:强烈推荐使用ConnectionStateListener监控连接的情形,当连接意况为LOST,curator-recipes下的兼具Api将会失灵或者逾期,虽然后边所有的事例都不曾应用到ConnectionStateListener。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点扩大, 更新,删除时,
Path Cache会改变它的情景, 会包含最新的子节点,
子节点的数码和状态,而事态的更变将通过PathChildrenCacheListener通告。

实在运用时会涉及到七个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过下边的构造函数创制Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start措施,使用完后调用close模式。
可以设置StartMode来实现启动的情势,

StartMode有上边二种:

  1. NORMAL:正常先导化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache伊始化数据后发送一个PathChildrenCache伊夫(Eve)nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以追加listener监听缓存的变化。

getCurrentData()措施重临一个List<ChildData>对象,可以遍历所有的子节点。

安装/更新、移除其实是行使client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:一经new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将回来null,cache将不会缓存节点数据。

注意:演示中的Thread.sleep(10)可以注释掉,可是注释后事件监听的触发次数会不全,这或许与PathCache的落实原理有关,无法太过多次的触发事件!

缓存

Zookeeper原生补助通过注册沃特cher来举办事件监听,可是开发者需要反复注册(沃特(Wat)cher只好单次注册单次使用)。Cache是Curator中对事件监听的包裹,可以看成是对事件监听的地面缓存视图,可以自行为开发者处理反复注册监听。Curator提供了两种沃特cher(Cache)来监听结点的变动。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个特定的节点。它关系到下边的六个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:动用cache,仍然要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将取得节点当前的景色,通过它的图景可以博得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以身作则中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不全,这也许与NodeCache的实现原理有关,无法太过频繁的触及事件!

注意:NodeCache只可以监听一个节点的处境变化。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点扩张, 更新,删除时,
Path Cache会改变它的景色, 会包含最新的子节点,
子节点的数据和情景,而事态的更变将透过PathChildrenCacheListener通知。

实际上使用时会涉及到五个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过上面的构造函数成立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想采纳cache,必须调用它的start格局,使用完后调用close方法。
可以设置StartMode来实现启动的形式,

StartMode有下边二种:

  1. NORMAL:正常先导化。
  2. BUILD_INITIAL_CACHE:在调用start()前边会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache起先化数据后发送一个PathChildrenCache伊夫(Eve)nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以追加listener监听缓存的扭转。

getCurrentData()艺术重返一个List<ChildData>对象,能够遍历所有的子节点。

安装/更新、移除其实是利用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:假诺new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将不会缓存节点数据。

注意:示范中的Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不全,这或者与PathCache的实现原理有关,不可以太过频繁的接触事件!

Tree Cache

Tree
Cache可以监督所有树上的拥有节点,类似于PathCache和NodeCache的重组,主要涉嫌到下面四个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的风波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中尚无使用Thread.sleep(10),可是事件触发次数也是正常的。

注意:TreeCache在起首化(调用start()情势)的时候会回调TreeCacheListener实例一个事TreeCache伊夫(Eve)nt,而回调的TreeCache伊芙nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能造成空指针相当,这里应该主动处理并避免这种意况。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它关系到上面的五个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:应用cache,仍然要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将收获节点当前的景色,通过它的景观可以收获当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不全,这说不定与NodeCache的实现原理有关,无法太过多次的触及事件!

注意:NodeCache只好监听一个节点的状况变化。

Leader选举

在分布式总计中, leader elections是很要紧的一个成效,
这么些选举过程是这样子的: 指派一个经过作为协会者,将任务分发给各节点。
在职责先河前,
哪个节点都不领会何人是leader(领导者)或者coordinator(协调者).
当选举算法开首施行后, 每个节点最后会博得一个唯一的节点作为天职leader.
除此之外,
选举还每每会时有暴发在leader意外宕机的情形下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后经过Zab协议落实follower的一块,leader或者follower都得以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前者是装有存活的客户端不间断的轮换做Leader,南充社会。后者是只要选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

Tree Cache

Tree
Cache可以监控整个树上的有所节点,类似于PathCache和NodeCache的组成,紧要涉嫌到下面六个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的事件类
  • ChildData – 节点数据

    public class TreeCacheDemo {

    private static final String PATH = "/example/cache";
    
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
    

    }

注意:在此示例中从不拔取Thread.sleep(10),但是事件触发次数也是常规的。

注意:TreeCache在起始化(调用start()办法)的时候会回调TreeCacheListener实例一个事TreeCache伊夫(Eve)nt,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针万分,这里应该主动处理并避免这种气象。

LeaderLatch

LeaderLatch有五个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

要是启动,LeaderLatch会和其余使用相同latch
path的其他LeaderLatch交涉,然后里面一个最终会被公推为leader,可以通过hasLeadership措施查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //再次回到true表达当前实例是leader

仿佛JDK的CountDownLatch,
LeaderLatch在伸手成为leadership会block(阻塞),一旦不选拔LeaderLatch了,必须调用close形式。
固然它是leader,会释放leadership, 其余的出席者将会选出一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

万分处理:
LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为自己仍旧leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再一次成立一个。LeaderLatch用户必须考虑导致leadership丢失的连天问题。
强烈推荐你拔取ConnectionStateListener。

一个LeaderLatch的采纳例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的借助方便开展测试,不需要启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

率先我们创制了10个LeaderLatch,启动后它们中的一个会被推举为leader。
因为选举会花费一些日子,start后并不可能登时就得到leader。
通过hasLeadership翻开自己是否是leader, 假假若的话重返true。
能够通过.getLeader().getId()可以得到当前的leader的ID。
只可以透过close出狱当前的政权。
await是一个绿灯方法, 尝试获取leader地位,可是未必能上位。

Leader选举

在分布式总结中, leader elections是很重大的一个效应,
这一个选举过程是这样子的: 指派一个经过作为社团者,将任务分发给各节点。
在职责最先前,
哪个节点都不晓得何人是leader(领导者)或者coordinator(协调者).
当选举算法最先履行后, 每个节点最后会博得一个唯一的节点作为天职leader.
除此之外,
选举还每每会时有暴发在leader意外宕机的情状下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后经过Zab协和落实follower的共同,leader或者follower都得以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前者是装有存活的客户端不间断的轮换做Leader,松原社会。后者是只要选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

LeaderSelector

LeaderSelector使用的时候根本涉嫌下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时您的listener的takeLeadership()主意被调用。而takeLeadership()方法只有领导权被放出时才回到。
当你不再动用LeaderSelector实例时,应该调用它的close方法。

非凡处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接情形的更动。如果实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再度连接成功以前它可能不再是leader了。 倘使LOST状态出现,
实例不再是leader, takeLeadership方法重临。

重要: 推荐处理形式是当接过SUSPENDED 或
LOST时抛出CancelLeadershipException相当.。这会导致LeaderSelector实例中断并吊销执行takeLeadership方法的十分.。这十分重大,
你必须考虑增加LeaderSelectorListenerAdapter.
LeaderSelectorListener艾达pter提供了推介的处理逻辑。

下边的一个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership举行任务的分配等等,并且毫不回来,假使你想要要此实例一贯是leader的话可以加一个死循环。调用
leaderSelector.autoRequeue();确保在此实例释放领导权之后还可能获取领导权。
在这里我们使用AtomicInteger来记录此client得到领导权的次数, 它是”fair”,
每个client有同等的机遇赢得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相对而言可以,LeaderLatch必须调用close()方法才会释放领导权,而对此LeaderSelector,通过LeaderSelectorListener可以对领导权举行支配,
在适当的时候释放领导权,那样各类节点都有可能得到领导权。从而,LeaderSelector具有更好的八面玲珑和可控性,建议有LeaderElection应用场景下优先采用LeaderSelector。

LeaderLatch

LeaderLatch有五个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

若果启动,LeaderLatch会和任何使用相同latch
path的另外LeaderLatch交涉,然后中间一个末尾会被选举为leader,可以经过hasLeadership艺术查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //再次来到true表达当前实例是leader

类似JDK的CountDownLatch,
LeaderLatch在伏乞成为leadership会block(阻塞),一旦不行使LeaderLatch了,必须调用close方法。
即便它是leader,会自由leadership, 其余的参加者将会选举一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

万分处理:
LeaderLatch实例可以追加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为自己或者leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再也成立一个。LeaderLatch用户必须考虑导致leadership丢失的连天问题。
强烈推荐你选拔ConnectionStateListener。

一个LeaderLatch的采纳例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的依靠方便举行测试,不需要启动真实的zookeeper服务端:

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

率先我们创立了10个LeaderLatch,启动后它们中的一个会被公推为leader。
因为选举会花费一些时间,start后并不可以登时就取得leader。
通过hasLeadership翻开自己是否是leader, 倘要是的话重临true。
可以通过.getLeader().getId()可以获取当前的leader的ID。
只好因此close释放当前的政权。
await是一个绿灯方法, 尝试获取leader地位,不过未必能上位。

分布式锁

提醒:

1.推荐使用ConnectionStateListener监控连接的情状,因为当连接LOST时您不再抱有锁

2.分布式的锁全局同步,
这意味任何一个时日点不会有多少个客户端都拥有同等的锁。

LeaderSelector

LeaderSelector使用的时候根本涉及下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

主旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时您的listener的takeLeadership()措施被调用。而takeLeadership()方法唯有领导权被放走时才回去。
当你不再拔取LeaderSelector实例时,应该调用它的close方法。

充裕处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接意况的改观。就算实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再度连接成功在此之前它可能不再是leader了。 假使LOST状态出现,
实例不再是leader, takeLeadership方法再次回到。

重要: 推荐处理格局是当接受SUSPENDED 或
LOST时抛出CancelLeadershipException分外.。这会招致LeaderSelector实例中断并收回执行takeLeadership方法的异常.。这特别重大,
你不可能不考虑扩大LeaderSelectorListener艾达pter.
LeaderSelectorListener艾达(Ada)pter提供了引进的拍卖逻辑。

下边的一个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您可以在takeLeadership进行任务的分红等等,并且不要回来,假使您想要要此实例一向是leader的话可以加一个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还可能拿到领导权。
在此地大家选取AtomicInteger来记录此client拿到领导权的次数, 它是”fair”,
每个client有同一的时机取得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对待能够,LeaderLatch必须调用close()艺术才会自由领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行支配,
在适龄的时候释放领导权,这样各类节点都有可能取得领导权。从而,LeaderSelector具有更好的灵活性和可控性,提出有LeaderElection应用场景下优先使用LeaderSelector。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都得以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在拥有锁的还要,可以频繁拿到,不会被封堵。
它是由类InterProcessMutex来促成。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()得到锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()措施释放锁。 InterProcessMutex 实例可以引用。

Revoking ZooKeeper recipes wiki定义了可协商的撤废机制。
为了裁撤mutex, 调用上边的点子:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

一经您请求撤废当前的锁,
调用attemptRevoke()形式,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

二次指示:错误处理
还是强烈推荐你利用ConnectionStateListener处理连接情况的改观。
当连接LOST时你不再抱有锁。

率先让大家成立一个效仿的共享资源,
那个资源期望只好单线程的拜访,否则会有现身问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创设一个InterProcessMutexDemo类, 它承担请求锁,
使用资源,释放锁这样一个完好的拜会过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很粗略,生成10个client, 每个client重复执行10次
请求锁–访问资源–释放锁的经过。每个client都在独立的线程中。
结果可以见到,锁是自由的被每个实例排他性的使用。

既然如此是可采用的,你可以在一个线程中反复调用acquire(),在线程拥有锁时它连接回到true。

您不应当在五个线程中用同一个InterProcessMutex
你可以在每个线程中都生成一个新的InterProcessMutex实例,它们的path都同一,这样它们可以共享同一个锁。

分布式锁

提醒:

1.引进使用ConnectionStateListener监控连接的事态,因为当连接LOST时您不再具备锁

2.分布式的锁全局同步,
这意味任何一个年华点不会有五个客户端都拥有同样的锁。

不足重入共享锁—Shared Lock

这些锁和上边的InterProcessMutex相比,就是少了Reentrant的效应,也就表示它不可能在同一个线程中重入。这多少个类是InterProcessSemaphoreMutex,使用方法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运作后意识,有且只有一个client成功获取第一个锁(第一个acquire()模式重回true),然后它和谐过不去在第二个acquire()主意,获取第二个锁超时;其他兼具的客户端都阻塞在首先个acquire()情势超时并且抛出非凡。

这样也就表达了InterProcessSemaphoreMutex心想事成的锁是不足重入的。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是大局可见的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在所有锁的同时,可以频繁收获,不会被打断。
它是由类InterProcessMutex来实现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()赢得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()形式释放锁。 InterProcessMutex 实例可以采取。

Revoking ZooKeeper recipes wiki定义了可研究的吊销机制。
为了撤消mutex, 调用下面的章程:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

设若您请求撤除当前的锁,
调用attemptRevoke()方法,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

二次提示:错误处理
依旧强烈推荐你选取ConnectionStateListener拍卖连接情形的变更。
当连接LOST时您不再持有锁。

首先让我们创造一个模仿的共享资源,
这些资源期望只好单线程的拜会,否则会有出现问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来创设一个InterProcessMutexDemo类, 它承担请求锁,
使用资源,释放锁这样一个全体的造访过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很粗略,生成10个client, 每个client重复执行10次
请求锁–访问资源–释放锁的历程。每个client都在单身的线程中。
结果能够观察,锁是随便的被每个实例排他性的采取。

既然如此是可采纳的,你可以在一个线程中反复调用acquire(),在线程拥有锁时它连接回到true。

您不应当在两个线程中用同一个InterProcessMutex
你可以在每个线程中都生成一个新的InterProcessMutex实例,它们的path都同一,这样它们可以共享同一个锁。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个顶住读操作,其余一个顶住写操作。读操作在写锁没被利用时可同时由三个经过使用,而写锁在使用时不容许读(阻塞)。

此锁是可重入的。一个富有写锁的线程可重入读锁,但是读锁却无法进入写锁。这也代表写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是非常的。

可重入读写锁紧要由四个类实现:InterProcessReadWriteLockInterProcessMutex。使用时首先成立一个InterProcessReadWriteLock实例,然后再依据你的要求得到读锁或者写锁,读写锁的档次是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

不得重入共享锁—Shared Lock

以此锁和地点的InterProcessMutex相对而言,就是少了Reentrant的效果,也就象征它不可以在同一个线程中重入。这么些类是InterProcessSemaphoreMutex,使用办法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运转后发觉,有且只有一个client成功赢得第一个锁(第一个acquire()方法再次来到true),然后它自己过不去在其次个acquire()措施,获取第二个锁超时;其他所有的客户端都阻塞在第一个acquire()方法超时并且抛出相当。

如此也就印证了InterProcessSemaphoreMutex落实的锁是不行重入的。

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有两种办法得以操纵semaphore的最大租约数。第一种艺术是用户给定path并且指定最大LeaseSize。第三种模式用户给定path并且利用SharedCountReader类。假设不使用SharedCountReader,
必须保证所有实例在多进程中应用同样的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,不过在B进程中具备的最大租约数量为20,此时租约的含义就失效了。

本次调用acquire()会回到一个租约对象。
客户端必须在finally中close那么些租约对象,否则这么些租约会丢失掉。 但是,
可是,尽管客户端session由于某种原因比如crash丢掉,
那么这么些客户端持有的租约会自动close,
这样任何客户端可以继承采纳这个租约。 租约还可以够通过上边的措施返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

留神你可以三次性请求两个租约,尽管Semaphore当前的租约不够,则请求线程会被封堵。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的最紧要类包括下边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先我们先拿到了5个租约, 最终大家把它还给了semaphore。
接着请求了一个租约,因为semaphore还有5个租约,所以恳请可以满意,重回一个租约,还剩4个租约。
然后再请求一个租约,因为租约不够,卡住到过期,仍旧没能满足,重返结果为null(租约不足会阻塞到过期,然后回来null,不会积极抛出十分;倘若不设置超时时间,会雷同阻塞)。

地点说讲的锁都是持平锁(fair)。 总ZooKeeper的角度看,
每个客户端都遵照请求的逐条拿到锁,不设有非公平的侵吞的情景。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个担负读操作,此外一个担当写操作。读操作在写锁没被利用时可同时由三个经过使用,而写锁在应用时不容许读(阻塞)。

此锁是可重入的。一个负有写锁的线程可重入读锁,不过读锁却不可以跻身写锁。这也象征写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是那一个的。

可重入读写锁首要由五个类实现:InterProcessReadWriteLockInterProcessMutex。使用时首先创制一个InterProcessReadWriteLock实例,然后再遵照你的急需得到读锁或者写锁,读写锁的门类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁的容器。 当调用acquire()
所有的锁都会被acquire(),假诺请求失利,所有的锁都会被release。
同样调用release时拥有的锁都被release(挫折被忽视)。
基本上,它就是组锁的表示,在它下面的伏乞释放操作都会传递给它富含的具备的锁。

重在涉及多个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数需要包含的锁的联谊,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包含一个重入锁和一个非重入锁。
调用acquire()后可以看来线程同时具备了这多个锁。
调用release()见状这六个锁都被释放了。

最终再重蹈覆辙两回,
强烈推荐使用ConnectionStateListener监控连接的场合,当连接情形为LOST,锁将会丢掉。

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认可(permits),而Curator中称之为租约(Lease)。
有两种艺术可以决定semaphore的最大租约数。第一种格局是用户给定path并且指定最大LeaseSize。第二种方法用户给定path并且采纳SharedCountReader类。假定不应用SharedCount里德(Reade)r,
必须保证拥有实例在多进程中利用相同的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,但是在B进程中所有的最大租约数量为20,此时租约的意思就失效了。

这次调用acquire()会再次来到一个租约对象。
客户端必须在finally中close那个租约对象,否则这一个租约会丢失掉。 不过,
可是,要是客户端session由于某种原因比如crash丢掉,
那么这么些客户端持有的租约会自动close,
这样任何客户端能够继续采取这一个租约。 租约还是可以透过上边的措施返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

专注你可以四遍性请求四个租约,假设Semaphore当前的租约不够,则请求线程会被堵塞。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的基本点类包括上面多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

    public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";
    
    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
    
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
    
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");
    
            resource.use();
    
            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);
    
            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
    

    }

先是大家先取得了5个租约, 最后大家把它还给了semaphore。
接着请求了一个租约,因为semaphore还有5个租约,所以恳请可以满足,重临一个租约,还剩4个租约。
然后再请求一个租约,因为租约不够,卡住到过期,仍然没能知足,再次来到结果为null(租约不足会阻塞到过期,然后回来null,不会积极抛出十分;倘若不安装超时时间,会雷同阻塞)。

下边说讲的锁都是正义锁(fair)。 总ZooKeeper的角度看,
每个客户端都按照请求的顺序得到锁,不设有非公平的侵吞的情状。

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现一个集群共享的计数器。
只要利用同样的path就能够拿到最新的计数器值,
这是由ZooKeeper的一致性保证的。Curator有三个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁的器皿。 当调用acquire()
所有的锁都会被acquire(),倘使请求战败,所有的锁都会被release。
同样调用release时持有的锁都被release(未果被忽略)。
基本上,它就是组锁的象征,在它上边的哀告释放操作都会传送给它富含的具有的锁。

第一涉嫌五个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数需要包含的锁的会合,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

 

新建一个InterProcessMultiLock, 包含一个重入锁和一个非重入锁。
调用acquire()后可以看来线程同时具备了这六个锁。
调用release()看看这六个锁都被放出了。

最后再重复一遍,
强烈推荐使用ConnectionStateListener监控连接的情事,当连接意况为LOST,锁将会丢掉。

分布式int计数器—SharedCount

其一类应用int类型来计数。 首要涉及五个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可以为它扩展一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值,
包括字面值和带版本消息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这些事例中,我们利用baseCount来监听计数值(addListener办法来添加SharedCountListener
)。 任意的SharedCount, 只要选择同一的path,都足以收获那一个计数值。
然后我们采纳5个线程为计数值扩展一个10以内的随机数。相同的path的SharedCount对计数值举办改动,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此间我们应用trySetCount去设置计数器。
首先个参数提供当前的VersionedValue,假设中间其它client更新了此计数值,
你的立异可能不成功,
可是此时你的client更新了流行的值,所以战败了你可以品尝再更新两次。
setCount是挟持更新计数器的值

留意计数器必须start,使用完之后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper能够实现一个集群共享的计数器。
只要利用同样的path就足以博得最新的计数器值,
这是由ZooKeeper的一致性保证的。Curator有五个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外,
它首先尝试利用乐观锁的法门设置计数器,
假使不成事(比如期间计数器已经被其他client更新了),
它接纳InterProcessMutex主意来更新计数值。

可以从它的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一文山会海的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩张一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查重临结果的succeeded(), 它意味着此操作是否成功。
假如操作成功, preValue()代表操作前的值,
postValue()表示操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式int计数器—SharedCount

这一个类应用int类型来计数。 重要涉嫌几个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器,
可以为它扩充一个SharedCountListener,当计数器改变时此Listener可以监听到改变的风波,而SharedCountReader可以读取到新型的值,
包括字面值和带版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这么些例子中,大家接纳baseCount来监听计数值(addListener主意来添加SharedCountListener
)。 任意的SharedCount, 只要使用相同的path,都可以收获这么些计数值。
然后我们运用5个线程为计数值扩大一个10以内的随机数。相同的path的SharedCount对计数值举行变更,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此地我们采纳trySetCount去设置计数器。
先是个参数提供当前的VersionedValue,假如期间此外client更新了此计数值,
你的翻新可能不成事,
可是此时你的client更新了新星的值,所以败北了您可以尝尝再更新两回。
setCount是威迫更新计数器的值

只顾计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式队列

行使Curator也能够简化Ephemeral Node
(临时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以确保放入到行列中的项目是依据顺序排队的。
假如纯粹的消费者从队列中取数据, 那么它是先入先出的,这也是队列的风味。
假使您严峻要求顺序,你就的利用单一的买主,可以接纳Leader选举只让Leader作为唯一的消费者。

不过, 依据Netflix的Curator作者所说,
ZooKeeper真心不切合做Queue,或者说ZK没有兑现一个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实践中ZNode必须相对较小,而队列包含众多的信息,异常的大。
  2. 假如有很多节点,ZK启动时分外的慢。 而使用queue会导致成千上万ZNode.
    你需要显明增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创设了一个特意的程序做这事。
  4. 当很大方的带有众多的子节点的ZNode时, ZK的性能变得不佳
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

虽说, Curator依旧创制了各样Queue的贯彻。
如若Queue的数据量不太多,数据量不太大的情事下,酌情考虑,还可以够运用的。

分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的限量比SharedCount大了之外,
它首先尝试接纳乐观锁的点子设置计数器,
要是不成事(比如期间计数器已经被其他client更新了),
它利用InterProcessMutex方法来更新计数值。

可以从它的其中贯彻DistributedAtomicValue.trySet()中看出:

 AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一雨后春笋的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省再次回到结果的succeeded(), 它表示此操作是否中标。
如果操作成功, preValue()意味着操作前的值,
postValue()表示操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列—DistributedQueue

DistributedQueue是最常见的一种队列。 它计划以下两个类:

  • QueueBuilder – 成立队列使用QueueBuilder,它也是此外队列的开创类
  • QueueConsumer – 队列中的消息消费者接口
  • QueueSerializer –
    队列音信连串化和反连串化接口,提供了对队列中的对象的系列化和反体系化
  • DistributedQueue – 队列实现类

QueueConsumer是顾客,它可以接收队列的数据。处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。

好端端状况下先将消息从队列中移除,再交由消费者消费。但这是六个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当顾客消费数量时享有锁,这样任何消费者无法消费此信息。假若消费失败或者经过死掉,音信可以提交其他进程。这会带来或多或少特性的损失。最好或者单消费者格局采纳队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了三个分布式队列和五个买主,因为PATH是均等的,会设有消费者抢占消费音讯的景观。

分布式队列

动用Curator也可以简化Ephemeral Node
(临时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以确保放入到行列中的项目是遵照顺序排队的。
如若单纯的主顾从队列中取数据, 那么它是先入先出的,这也是队列的表征。
假如您严峻要求顺序,你就的选取单一的顾客,可以运用Leader选举只让Leader作为唯一的主顾。

可是, 遵照Netflix的Curator作者所说,
ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包含众多的音讯,相当的大。
  2. 设若有广大节点,ZK启动时分外的慢。 而使用queue会导致成千上万ZNode.
    你需要显然增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创制了一个特另外主次做这事。
  4. 当很大气的包含众多的子节点的ZNode时, ZK的性质变得糟糕
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占有很多的内存空间。

即使, Curator仍然创设了各种Queue的兑现。
倘使Queue的数据量不太多,数据量不太大的情况下,酌情考虑,仍然得以拔取的。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地点的行列类似,只是可以为队列中的每一个元素设置一个ID
可以由此ID把队列中随心所欲的元素移除。 它事关多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过下面方法创设:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在这一个事例中,
有些元素还从未被消费者消费前就移除了,这样顾客不会收到删除的音信。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式队列—DistributedQueue

DistributedQueue是最普通的一种队列。 它设计以下多个类:

  • QueueBuilder – 创立队列使用QueueBuilder,它也是任何队列的成立类
  • QueueConsumer – 队列中的信息消费者接口
  • QueueSerializer –
    队列信息连串化和反连串化接口,提供了对队列中的对象的体系化和反系列化
  • DistributedQueue – 队列实现类

QueueConsumer是主顾,它能够接收队列的数目。处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。

正规境况下先将音信从队列中移除,再付诸消费者消费。但这是多少个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者消费数据时拥有锁,那样任何消费者不可能消费此音信。虽然消费战败或者经过死掉,音信可以提交其他进程。这会带动或多或少特性的损失。最好如故单消费者情势应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了六个分布式队列和六个买主,因为PATH是同一的,会存在消费者抢占消费音信的图景。

先行级分布式队列—DistributedPriorityQueue

优先级队列对队列中的元素按照优先级实行排序。 Priority越小,
元素越靠前, 越先被消费掉
。 它关系上面多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

透过builder.buildPriorityQueue(minItemsBeforeRefresh)方法成立。
当优先级队列得到元素增删音讯时,它会半途而废处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的系列的小不点儿数量。
重要安装你的次序可以容忍的不排序的微乎其微值。

放入队列时索要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶然你或许会有错觉,优先级设置并不曾起效。这是因为事先级是对于队列积压的因素而言,假使消费速度过快有可能出现在后一个要素入队操作往日前一个元素已经被消费,这种情状下DistributedPriorityQueue会退化为DistributedQueue。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方面的行列类似,不过可以为队列中的每一个元素设置一个ID
能够经过ID把队列中随心所欲的元素移除。 它事关多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

由此下边方法成立:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在这一个事例中,
有些元素还没有被消费者消费前就移除了,那样顾客不会接受删除的信息。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不了解你是不是谙习。
DistributedDelayQueue也提供了仿佛的功力, 元素有个delay值,
消费者隔一段时间才能接过元素。 涉及到下面七个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

透过下面的说话成立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时方可指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个时光间隔,
比如20阿秒,而是未来的一个年华戳,如 System.current提姆eMillis() + 10秒。
假设delayUntilEpoch的年月已经仙逝,音信会即时被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

先行级分布式队列—DistributedPriorityQueue

事先级队列对队列中的元素依照事先级举行排序。 Priority越小,
元素越靠前, 越先被消费掉
。 它关系上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

透过builder.buildPriorityQueue(minItemsBeforeRefresh)方法成立。
当优先级队列拿到元素增删音讯时,它会半途而废处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的连串的蝇头数量。
紧要安装你的次序可以忍受的不排序的微乎其微值。

放入队列时索要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

 

偶然你也许会有错觉,优先级设置并不曾起效。这是因为事先级是对于队列积压的因素而言,假诺消费速度过快有可能出现在后一个要素入队操作以前前一个要素已经被消费,这种场地下DistributedPriorityQueue会退化为DistributedQueue。

SimpleDistributedQueue

前边即使实现了各类队列,但是你放在心上到没有,那些队列并不曾兑现类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(不过从未兑现Queue接口)。
创建很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

追泰铢素:

public boolean offer(byte[] data) throws Exception

去除元素:

public byte[] take() throws Exception

除此以外还提供了任何情势:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take方法在功成名就重返在此之前会被卡住。
poll主目的在于队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

唯独其实发送了100条音讯,消费完第一条之后,前面的信息不可以消费,近来没找到原因。查看一下官方文档推荐的demo使用下边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过其进行使发现如故存在消费阻塞问题。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不知道你是不是熟练。
DistributedDelayQueue也提供了类似的效应, 元素有个delay值,
消费者隔一段时间才能收到元素。 涉及到下边两个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

透过上面的口舌成立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时得以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个时刻间隔,
比如20飞秒,而是以后的一个时日戳,如 System.current提姆(Tim)eMillis() + 10秒。
假使delayUntilEpoch的刻钟已经过去,音讯会及时被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

分布式屏障—巴里(Barrie)r

分布式巴里(Barrie)r是这么一个类:
它会阻塞所有节点上的等候过程,直到某一个被知足,
然后具备的节点继续举办。

例如赛马竞赛中, 等赛马陆续赶来起跑线前。
一声令下,所有的赛马都飞奔而出。

SimpleDistributedQueue

眼前即便实现了种种队列,不过你放在心上到没有,这一个队列并没有实现类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(但是并未实现Queue接口)。
创制很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

充实元素:

public boolean offer(byte[] data) throws Exception

去除元素:

public byte[] take() throws Exception

其它还提供了此外措施:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take主目的在于成功重回从前会被打断。
poll艺术在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

可是实际上发送了100条消息,消费完第一条之后,前面的信息不可能消费,最近没找到原因。查看一下法定文档推荐的demo使用下边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

唯独事实上应用发现依旧存在消费阻塞问题。

DistributedBarrier

DistributedBarrier类实现了栅栏的效能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

先是你需要安装栅栏,它将阻塞在它上边等待的线程:

setBarrier();

然后需要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

分外处理 Distributed巴里(Barrie)r
会监控连接情状,当连接断掉时waitOnBarrier()方法会抛出特别。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

本条例子创制了controlBarrier来设置栅栏和移除栅栏。
大家创立了5个线程,在此巴里r上等待。
最终移除栅栏后具备的线程才继续执行。

只要您从头不设置栅栏,所有的线程就不会阻塞住。

分布式屏障—Barrier

分布式Barrier是这么一个类:
它会阻塞所有节点上的等候过程,直到某一个被满意,
然后有着的节点继续进行。

譬如赛马竞赛中, 等赛马陆续赶来起跑线前。
一声令下,所有的赛马都飞奔而出。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的起初和了结时一并。当充足的历程进入到双栅栏时,进程始起统计,
当统计完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()措施被调用时,成员被卡住,直到所有的分子都调用了enter()
leave()方法被调用时,它也不通调用线程,直到所有的分子都调用了leave()
就像百米赛跑比赛, 发令枪响,
所有的选手开头跑,等具备的健儿跑过巅峰线,比赛才截止。

DistributedDoubleBarrier会监控连接情形,当连接断掉时enter()leave()方法会抛出非凡。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理与实施》
《 跟着实例学习ZooKeeper的用法》博客系列

项目仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,相比较便利导航到每个章节,只是简书不援助,本文的MD原文放在项目的/resources/md目录下,有爱自取,随笔用Typora编写,提出用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
自己是throwable,在Washington加油,白天上班,清晨和双休不定时加班,傍晚闲暇坚定不移写下博客。
期待自己的稿子能够给您带来收获,共勉。

DistributedBarrier

DistributedBarrier类实现了栅栏的功用。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

先是你需要安装栅栏,它将封堵在它下面等待的线程:

setBarrier();

下一场需要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

可怜处理 Distributed巴里(Barrie)r
会监控连接情状,当连接断掉时waitOnBarrier()方法会抛出特别。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

本条例子创造了controlBarrier来设置栅栏和移除栅栏。
大家成立了5个线程,在此巴里r上等待。
最终移除栅栏后所有的线程才继续执行。

假定您开头不安装栅栏,所有的线程就不会阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的开端和了结时手拉手。当充裕的长河进入到双栅栏时,进程始起总括,
当总计完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()措施被调用时,成员被打断,直到所有的分子都调用了enter()
leave()方法被调用时,它也不通调用线程,直到所有的分子都调用了leave()
就像百米赛跑竞技, 发令枪响,
所有的健儿最先跑,等有着的健儿跑过巅峰线,比赛才截至。

DistributedDoubleBarrier会监控连接境况,当连接断掉时enter()leave()方法会抛出万分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理与履行》
《 跟着实例学习ZooKeeper的用法》博客序列

项目仓库:
https://github.com/zjcscut/curator-seed

作者:zhrowable
链接:https://www.jianshu.com/p/70151fc0ef5d
來源:简书
简书小说权归作者所有,任何款式的转载都请联系作者得到授权并阐明出处。

相关文章