Curator无疑是Zookeeper客户端中的瑞士联邦军刀永利官方网站

[TOC]

原文:https://www.jianshu.com/p/70151fc0ef5d

Zookeeper客户端Curator使用详解

Zookeeper客户端Curator使用详解

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,消除了多数Zookeeper客户端特别底层的内幕开荒职业,包涵接二连三重连、屡次注册Watcher和NodeExistsException卓殊等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的原故是比较有意思的,上边包车型地铁部分摘抄自《从PAXOS到ZOOKEEPELacrosse遍布式一致性原理与实行》一书:
Zookeeper最早起点于雅虎的商讨院的三个切磋小组。在及时,商量人口开掘,在雅虎内部非常多特大型的类别必要借助三个近乎的系统开展遍布式和煦,但是那些系统往往存在布满式单点难题。所以雅虎的开荒职员就计划开垦三个通用的无单点难点的分布式和睦框架。在立项开始时期,记挂到众多等级次序都以用动物的名字来定名的(举个例子盛名的Pig项目),雅虎的技术员希望给那些项目也取一个动物的名字。时任商讨院的上位地工学家Raghu
Ramakrishnan开玩笑说:再如此下去,我们那时候就成为动物园了。此话一出,我们纷繁表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一块儿,雅虎的全方位布满式系统看上去就疑似二个重型的动物园了,而Zookeeper正好用来举行布满式情况的和煦——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”可能”管理者”,不清楚是还是不是支付小组有意而为之,作者估计有相当大希望这么命名的来由是印证Curator就是Zookeeper的馆长(脑洞有一点点大:Curator便是动物园的园长)。
Curator包括了多少个包:
curator-framework:对zookeeper的平底api的局地包装
curator-client:提供一些客户端的操作,举个例子重试计策等
curator-recipes:装进了部分高级特性,如:Cache事件监听、大选、布满式锁、遍布式计数器、分布式Barrier等
Maven注重(使用curator的本子:2.12.0,对应Zookeeper的版本为:3.4.x,举例跨版本会有包容性难点,很有非常的大希望形成节点操作败北):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

前提

近期恰巧用到了zookeeper,做了三个基于SpringBoot、Curator、Bootstrap写了多少个可视化的Web应用:

zookeeper-console

接待使用和star。

Curator的基本Api

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,化解了累累Zookeeper客户端极其底层的底细开采专门的学业,满含连日来重连、频频注册Watcher和NodeExistsException至极等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的来由是比较有意思的,上边的局地摘抄自《从PAXOS到ZOOKEEPEEnclave分布式一致性原理与施行》一书:
Zookeeper最早源点于雅虎的斟酌院的多个研商小组。在即时,斟酌职员开掘,在雅虎内部相当多特大型的系统须求注重一个近似的体系开始展览布满式协调,然则那个系统往往存在布满式单点难点。所以雅虎的开荒职员就试图开采一个通用的无单点难题的遍及式和谐框架。在立项开始时期,缅怀到相当的多种类都以用动物的名字来定名的(比如著名的Pig项目),雅虎的工程师希望给这些类型也取一个动物的名字。时任研究院的上位物经济学家Raghu
Ramakrishnan开玩笑说:再这么下来,大家那时候就成为动物园了。此话一出,大家纷纭表示就叫动物园助理馆员吧——因为各类以动物命名的布满式组件放在一同,雅虎的上上下下遍及式系统看上去就疑似一个重型的动物园了,而Zookeeper正好用来进展分布式景况的和睦——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”恐怕”管理者”,不明白是或不是开垦小组有意而为之,笔者猜度有希望这么命名的因由是认证Curator就是Zookeeper的馆长(脑洞有一些大:Curator便是动物园的园长)。
Curator满含了多少个包:
curator-framework:对zookeeper的最底层api的局地打包
curator-client:提供一些客户端的操作,比方重试攻略等
curator-recipes:包装了某些高等特性,如:Cache事件监听、公投、遍及式锁、布满式计数器、遍布式Barrier等
Maven正视(使用curator的本子:2.12.0,对应Zookeeper的版本为:3.4.x,假使跨版本会有包容性难点,很有望引致节点操作战败):

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

创办会话

Curator的基本Api

1.选拔静态工程措施成立客户端

一个例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包蕴多个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

创立会话

2.使用Fluent风格的Api创造会话

着力参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

1.使用静态工程措施创立客户端

二个例证如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

 

newClient静态工厂方法包括多个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

3.创办包括隔离命名空间的对话

为了落实差异的Zookeeper业务之间的隔开分离,须要为各种工作分配三个独自的命名空间(NameSpace),即钦点贰个Zookeeper的根路线(官方术语:为Zookeeper添加“Chroot”特性)。举个例子(上边包车型大巴例证)当客户端内定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数码节点的操作都是根据该目录实行的。通过设置Chroot能够将客户端应用与Zookeeper服务端的一课子树相对应,在四个应用共用一个Zookeeper集群的气象下,那对于实现差别应用之间的相互隔开分离十分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

2.利用Fluent风格的Api创立会话

中央参数变为流式设置,八个列子如下:

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

 

启航客户端

当成立会话成功,获得client的实例然后可以一直调用其start( )方法:

client.start();

3.创制包涵隔开命名空间的对话

为了兑现不一致的Zookeeper业务之间的隔绝,必要为各类职业分配二个独门的命名空间(NameSpace),即内定三个Zookeeper的根路线(官方术语:为Zookeeper添加“Chroot”特性)。举个例子(上边包车型地铁例子)当客户端内定了单独命名空间为“/base”,那么该客户端对Zookeeper上的数量节点的操作都是依据该目录实行的。通过安装Chroot能够将客户端应用与Zookeeper服务端的一课子树相对应,在多少个使用共用三个Zookeeper集群的光景下,那对于实现区别选取之间的相互隔绝十三分有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

 

数量节点操作

起步客户端

当创制会话成功,得到client的实例然后能够一贯调用其start( )方法:

client.start();

创办数量节点

Zookeeper的节点创立格局:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:悠久化况且带类别号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:偶尔并且带种类号

**制造四个节点,初叶内容为空 **

client.create().forPath("path");

细心:若无设置节点属性,节点创造格局默以为长久化节点,内容默觉得空

创造三个节点,附带开端化内容

client.create().forPath("path","init".getBytes());

创设三个节点,钦命创建格局(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创建贰个节点,钦定创立形式(不经常节点),附带开首化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

始建一个节点,钦定创造格局(有时节点),附带伊始化内容,何况自动递归创设父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

以此creatingParentContainersIfNeeded()接口特别有用,因为相似意况开垦人士在开立四个子节点必须认清它的父节点是不是存在,假设不设有直接创设会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自行递归创造全部所需的父节点。

数量节点操作

去除数据节点

剔除贰个节点

client.delete().forPath("path");

小心,此格局只好去除叶子节点,否则会抛出极度。

剔除叁个节点,并且递归删除其具有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

剔除八个节点,强制指定版本举办删除

client.delete().withVersion(10086).forPath("path");

剔除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是四个保持格局,只要客户端会话有效,那么Curator会在后台持续实行删减操作,直到删除节点成功。

注意:上面包车型客车多少个流式接口是足以自由组合的,比方:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

开创数量节点

Zookeeper的节点创立情势:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:长久化况兼带类别号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:不经常並且带种类号

**创设三个节点,初阶内容为空 **

client.create().forPath("path");

只顾:若无安装节点属性,节点成立情势默以为漫长化节点,内容默以为空

创造贰个节点,附带初叶化内容

client.create().forPath("path","init".getBytes());

创制四个节点,钦定创立方式(有时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创设一个节点,钦点创制方式(一时节点),附带早先化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创办贰个节点,内定创立格局(不时节点),附带开端化内容,并且自动递归创造父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

以此creatingParentContainersIfNeeded()接口特别有用,因为相似情形开采人员在创建三个子节点必须认清它的父节点是或不是存在,假如不设有直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创设全体所需的父节点。

读取数据节点数据

读取三个节点的数额内容

client.getData().forPath("path");

只顾,此格局返的重回值是byte[ ];

读取三个节点的多寡内容,同一时间获得到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

剔除数据节点

删去二个节点

client.delete().forPath("path");

只顾,此方法只好去除叶子节点,不然会抛出非常。

除去二个节点,而且递归删除其持有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删去三个节点,强制钦点版本进行删除

client.delete().withVersion(10086).forPath("path");

剔除贰个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个维持格局,只要客户端会话有效,那么Curator会在后台持续实行删减操作,直到删除节点成功。

注意:上边的七个流式接口是足以自由组合的,比如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

更新数据节点数据

履新三个节点的数据内容

client.setData().forPath("path","data".getBytes());

注意:该接口会回来二个Stat实例

履新三个节点的多少内容,强制内定版本举行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

读取数据节点数据

读取一个节点的数码内容

client.getData().forPath("path");

在意,此措施返的重临值是byte[ ];

读取三个节点的数量内容,同偶尔候获得到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

检查节点是或不是存在

client.checkExists().forPath("path");

小心:该方法重回二个Stat实例,用于检查ZNode是还是不是留存的操作.
能够调用额外的章程(监察和控制或许后台管理)并在最后调用forPath(
)钦定要操作的ZNode

立异数据节点数据

履新一个节点的数目内容

client.setData().forPath("path","data".getBytes());

专注:该接口会回去贰个Stat实例

履新三个节点的数据内容,强制钦定版本举办更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

获得某些节点的全数子节点路线

client.getChildren().forPath("path");

注意:该方法的重临值为List<String>,获得ZNode的子节点Path列表。
能够调用额外的秘诀(监察和控制、后台管理依然获得状态watch, background or get
stat) 并在结尾调用for帕特h()钦点要操作的父ZNode

反省节点是还是不是留存

client.checkExists().forPath("path");

在意:该措施重返三个Stat实例,用于检查ZNode是或不是留存的操作.
能够调用额外的主意(监察和控制只怕后台管理)并在最后调用forPath(
)钦点要操作的ZNode

事务

CuratorFramework的实例包括inTransaction(
)接口方法,调用此措施开启三个ZooKeeper事务. 能够复合create, setData,
check, and/or delete
等操作然后调用commit()作为三个原子操作提交。三个事比方下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

收获有个别节点的全数子节点路线

client.getChildren().forPath("path");

小心:该方法的再次来到值为List<String>,得到ZNode的子节点Path列表。
能够调用额外的形式(监察和控制、后台管理或然获得状态watch, background or get
stat) 并在终极调用forPath()钦点要操作的父ZNode

异步接口

地点提到的创办、删除、更新、读取等办法都以一道的,Curator提供异步接口,引进了BackgroundCallback接口用于拍卖异步接口调用之后服务端重返的结果新闻。BackgroundCallback接口中二个生死攸关的回调值为CuratorEvent,里面含有事件类型、响应吗和节点的详细音讯。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

贰个异步制造节点的例证如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦定executor,那么会暗中同意使用Curator的EventThread去开始展览异步管理。

事务

CuratorFramework的实例富含inTransaction(
)接口方法,调用此措施开启二个ZooKeeper事务. 能够复合create, setData,
check, and/or delete
等操作然后调用commit()作为贰个原子操作提交。贰个例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

Curator美食指南(高等天性)

升迁:首先你不能够不增多curator-recipes重视,下文仅仅对recipes一些个性的施用实行解释和比方,不准备展开源码等第的钻探

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

一言九鼎提醒:刚毅推荐使用ConnectionStateListener监察和控制连接的动静,当连接景况为LOST,curator-recipes下的具有Api将会失效只怕逾期,固然后边全部的例证都未有行使到ConnectionStateListener。

异步接口

下面提到的创办、删除、更新、读取等办法都是同台的,Curator提供异步接口,引进了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次来到的结果音信。BackgroundCallback接口中一个首要的回调值为Curator伊芙nt,里面饱含事件类型、响应吗和节点的详细新闻。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

三个异步创设节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦定executor,那么会默许使用Curator的EventThread去开始展览异步管理。

缓存

Zookeeper原生协助通过注册Watcher来实行事件监听,但是开辟者需求频仍注册(Watcher只好单次注册单次使用)。Cache是Curator中对事件监听的包装,能够用作是对事件监听的本地缓存视图,能够自动为开辟者处理反复注册监听。Curator提供了二种Watcher(Cache)来监听结点的扭转。

Curator美食指南(高端性格)

提醒:首先你不能够不加多curator-recipes注重,下文仅仅对recipes一些特征的使用实行分解和举个例子,不策动打开源码级其他斟酌

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

入眼提示:刚强推荐使用ConnectionStateListener监察和控制连接的情形,当连接情状为LOST,curator-recipes下的有所Api将会失效也许逾期,就算前边全数的事例都不曾应用到ConnectionStateListener。

Path Cache

Path Cache用来监督叁个ZNode的子节点. 当三个子节点增添, 更新,删除时,
Path Cache会改造它的意况, 会包罗最新的子节点,
子节点的数额和景色,而气象的更变将透过PathChildrenCacheListener公告。

实质上采取时会涉及到多少个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

因此上边包车型客车构造函数创设Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start艺术,使用完后调用close方法。
能够设置StartMode来完毕运维的形式,

StartMode有下边三种:

  1. NORMAL:寻常开头化。
  2. BUILD_INITIAL_CACHE:在调用start()从前会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache起初化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能够扩充listener监听缓存的变迁。

getCurrentData()措施重返二个List<ChildData>目的,能够遍历全部的子节点。

安装/更新、移除其实是利用client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如若new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将不会缓存节点数据。

注意:演示中的Thread.sleep(10)能够注释掉,可是注释后事件监听的触发次数会不全,这只怕与PathCache的兑现原理有关,不可能太过频仍的接触事件!

缓存

Zookeeper原生协助通过挂号Watcher来拓展事件监听,可是开荒者须求频仍注册(Watcher只可以单次注册单次使用)。Cache是Curator中对事件监听的卷入,可以作为是对事件监听的地点缓存视图,能够自动为开垦者管理一再注册监听。Curator提供了三种沃特cher(Cache)来监听结点的变动。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某二个一定的节点。它关系到上边包车型大巴多个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:使用cache,依旧要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将赢得节点当前的场地,通过它的动静能够收获当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:事必躬亲中的Thread.sleep(10)能够注释,但是注释后事件监听的触发次数会不全,那大概与NodeCache的完成原理有关,不可能太过多次的触发事件!

注意:NodeCache只好监听一个节点的状态变化。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当贰个子节点扩张, 更新,删除时,
帕特h Cache会改变它的气象, 会富含最新的子节点,
子节点的多寡和情景,而气象的更变将因此PathChildrenCacheListener布告。

实在使用时会涉及到多个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过上边包车型的士构造函数成立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想选择cache,必须调用它的start办法,使用完后调用close措施。
可以安装StartMode来完毕运维的格局,

StartMode有下边三种:

  1. NORMAL:不荒谬开首化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache开头化数据后发送三个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能够扩充listener监听缓存的转移。

getCurrentData()方法再次来到二个List<ChildData>对象,可以遍历全部的子节点。

设置/更新、移除其实是采纳client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:假定new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将不会缓存节点数据。

注意:演示中的Thread.sleep(10)能够注释掉,可是注释后事件监听的触发次数会不全,那只怕与PathCache的兑现原理有关,不能够太过数次的触发事件!

Tree Cache

Tree
Cache能够监督全部树上的具有节点,类似于PathCache和NodeCache的重组,重要涉嫌到上面五个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCacheEvent – 触发的事件类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中尚无行使Thread.sleep(10),然则事件触发次数也是正规的。

注意:TreeCache在起初化(调用start()艺术)的时候会回调TreeCacheListener实例叁个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有望形成空指针至极,这里应该积极管理并防止这种地方。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某二个特定的节点。它关系到下边包车型地铁多个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:选用cache,依旧要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将得到节点当前的场所,通过它的动静能够收获当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)能够注释,但是注释后事件监听的触发次数会不全,那大概与NodeCache的完毕原理有关,不能够太过数次的接触事件!

注意:NodeCache只好监听三个节点的状态变化。

Leader选举

在遍及式计算中, leader elections是非常重大的一个效果,
那么些选举进度是那样子的: 指派三个进度作为组织者,将任务分发给各节点。
在义务早先前,
哪个节点都不知底什么人是leader(领导者)可能coordinator(和煦者).
当选举算法起初实行后, 各类节点最终会获取二个独一的节点作为天职leader.
除却,
公投还八天三头会时有发生在leader意外宕机的景况下,新的leader要被公投出来。

在zookeeper集群中,leader担负写操作,然后经过Zab钻探落到实处follower的联合,leader或许follower都得以管理读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是有所存活的客户端不间断的交替做Leader,平顶山社会。后面一个是假若公投出Leader,除非有客户端挂掉重新触发大选,不然不会交出领导权。某党?

Tree Cache

Tree
Cache能够监督全体树上的装有节点,类似于PathCache和NodeCache的咬合,首要涉嫌到下边多少个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的风云类
  • ChildData – 节点数据

    public class TreeCacheDemo {

    private static final String PATH = "/example/cache";
    
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
    

    }

注意:在此示例中从未行使Thread.sleep(10),不过事件触发次数也是不奇怪的。

注意:TreeCache在初叶化(调用start()主意)的时候会回调TreeCacheListener实例一个事TreeCache伊芙nt,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有非常大可能率导致空指针卓殊,这里应该主动管理并幸免这种意况。

LeaderLatch

LeaderLatch有多个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假如运营,LeaderLatch会和别的使用一样latch
path的别的LeaderLatch商谈,然后里面一个结尾会被推选为leader,能够透过hasLeadership主意查看LeaderLatch实例是不是leader:

leaderLatch.hasLeadership( ); //重返true表明当前实例是leader

就像JDK的CountDownLatch,
LeaderLatch在伸手成为leadership会block(阻塞),一旦不选择LeaderLatch了,必须调用close方法。
借使它是leader,会自由leadership, 其余的参加者将会选出贰个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

老大管理:
LeaderLatch实例能够扩大ConnectionStateListener来监听互联网连接难点。 当
SUSPENDED 或 LOST 时,
leader不再认为本人也许leader。当LOST后连续重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再行创造二个。LeaderLatch用户必须思量导致leadership错过的连年难题。
刚烈推荐你利用ConnectionStateListener。

贰个LeaderLatch的行使例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

能够加多test module的注重方便开始展览测量检验,不须要运维真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

第一我们创制了十一个LeaderLatch,运营后它们中的一个会被公推为leader。
因为公投会耗费一些时日,start后并不可能立即就拿走leader。
通过hasLeadership翻看自身是还是不是是leader, 如若是的话重回true。
可以透过.getLeader().getId()能够取妥当前的leader的ID。
只可以透过close放出当前的政权。
await是三个围堵方法, 尝试获取leader地位,可是未必能上位。

Leader选举

在布满式计算中, leader elections是很要紧的多少个作用,
那几个选举进程是那样子的: 指派贰个进程作为组织者,将任务分发给各节点。
在任务开首前,
哪个节点都不精通哪个人是leader(领导者)可能coordinator(和谐者).
当大选算法初阶实践后, 每一个节点最终会赢得八个独一的节点作为职务leader.
除了那几个之外,
公投还时一时会生出在leader意外宕机的情状下,新的leader要被大选出来。

在zookeeper集群中,leader负担写操作,然后通过Zab切磋落到实处follower的一块,leader恐怕follower都得以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是具备存活的客户端不间断的轮换做Leader,开封社会。前面一个是只要公投出Leader,除非有客户端挂掉重新触发选举,不然不会交出话语权。某党?

LeaderSelector

LeaderSelector使用的时候根本涉及下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运维,当实例猎取决定权时你的listener的takeLeadership()格局被调用。而takeLeadership()方法独有话语权被释放时才回到。
当你不再利用LeaderSelector实例时,应该调用它的close方法。

特别处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接景况的变动。如若实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再次连接成功以前它或然不再是leader了。 假设LOST状态出现,
实例不再是leader, takeLeadership方法再次来到。

重要: 推荐管理格局是当收到SUSPENDED 或
LOST时抛出CancelLeadershipException非凡.。那会促成LeaderSelector实例中断并注销实施takeLeadership方法的非凡.。那特别关键,
你不能不思量扩展LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了引入的拍卖逻辑。

下边包车型客车三个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您能够在takeLeadership实行任务的分红等等,而且毫不回来,假若您想要要此实例平素是leader的话能够加二个死循环。调用
leaderSelector.autoRequeue();保险在此实例释放定价权之后还或者赢得话语权。
在此处大家利用AtomicInteger来记录此client获得定价权的次数, 它是”fair”,
种种client有一样的时机获取定价权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对照能够,LeaderLatch必须调用close()措施展才干会释放话语权,而对于LeaderSelector,通过LeaderSelectorListener能够对定价权进行支配,
在方便的时候释放决定权,那样各类节点都有极大希望赢得定价权。进而,LeaderSelector具备更加好的八面见光和可控性,建议有LeaderElection应用场景下优用LeaderSelector。

LeaderLatch

LeaderLatch有四个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假定运行,LeaderLatch会和别的使用一样latch
path的别的LeaderLatch会谈,然后里面四个末段会被公推为leader,能够通过hasLeadership艺术查看LeaderLatch实例是或不是leader:

leaderLatch.hasLeadership( ); //重临true表明当前实例是leader

好像JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不采纳LeaderLatch了,必须调用close主意。
尽管它是leader,会释放leadership, 别的的参加者将会选出三个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

格外管理:
LeaderLatch实例能够扩张ConnectionStateListener来监听互连网连接难题。 当
SUSPENDED 或 LOST 时,
leader不再以为本身可能leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再度创立一个。LeaderLatch用户必须考虑导致leadership错过的三番两次难点。
刚强推荐你使用ConnectionStateListener。

一个LeaderLatch的施用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

能够增加test module的依赖性方便开始展览测验,无需运维真实的zookeeper服务端:

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我们创造了十三个LeaderLatch,运行后它们中的叁个会被推选为leader。
因为公投会成本一些岁月,start后并不能够马上就赢得leader。
通过hasLeadership查看自己是否是leader, 假诺是的话重返true。
能够通过.getLeader().getId()能够收获当前的leader的ID。
只可以通过close自由当前的政权。
await是贰个梗阻方法, 尝试获取leader地位,不过未必能上位。

布满式锁

提醒:

1.推荐使用ConnectionStateListener监察和控制连接的场所,因为当连接LOST时你不再具备锁

2.布满式的锁全局同步,
那表示任何叁个年华点不会有三个客户端都具有同样的锁。

LeaderSelector

LeaderSelector使用的时候根本涉及上面多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

宗旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运转,当实例猎取领导权时您的listener的takeLeadership()艺术被调用。而takeLeadership()方法独有话语权被放飞时才回来。
当你不再选取LeaderSelector实例时,应该调用它的close方法。

特别管理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接境况的改造。借使实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再一次连接成功在此之前它大概不再是leader了。 假如LOST状态出现,
实例不再是leader, takeLeadership方法重回。

重要: 推荐管理情势是当接过SUSPENDED 或
LOST时抛出CancelLeadershipException卓殊.。那会导致LeaderSelector实例中断并打消试行takeLeadership方法的至极.。那比较重大,
你必须思量扩张LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推荐的管理逻辑。

上面包车型大巴二个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你能够在takeLeadership进行职分的分配等等,而且不要回来,假设你想要要此实例一贯是leader的话能够加三个死循环。调用
leaderSelector.autoRequeue();保险在此实例释放领导权之后还或者赢得话语权。
在此处大家采取AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每一个client有同一的机缘收获话语权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对待能够,LeaderLatch必须调用close()格局才会放出话语权,而对此LeaderSelector,通过LeaderSelectorListener可以对定价权进行调控,
在卓殊的时候释放话语权,那样种种节点都有希望赢得定价权。进而,LeaderSelector具备越来越好的圆滑和可控性,建议有LeaderElection应用场景下优用LeaderSelector。

可重入分享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都得以哀告锁。
Reentrant和JDK的ReentrantLock类似,就能够重入,
意味着同一个客户端在享有锁的还要,能够频频获得,不会被堵塞。
它是由类InterProcessMutex来落到实处。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()赢得锁,并提供超机遇制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()方法释放锁。 InterProcessMutex 实例能够援用。

Revoking ZooKeeper recipes wiki定义了可协商的打消机制。
为了撤废mutex, 调用下边包车型客车法子:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假定您央浼打消当前的锁,
调用attemptRevoke()措施,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

一回提醒:错误管理
照旧刚烈推荐你使用ConnectionStateListener管理连接意况的更换。
当连接LOST时你不再抱有锁。

第一让大家创设一个仿照的分享财富,
那么些财富期望只好单线程的走访,不然会有出现难题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

然后成立二个InterProcessMutexDemo类, 它承受伏乞锁,
使用财富,释放锁那样贰个完完全全的拜候进程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也非常粗大略,生成11个client, 各类client重复实践11次央求锁–访问能源–释放锁的长河。每一个client都在独立的线程中。
结果能够见到,锁是专擅的被每一个实例排他性的选取。

既然是可选取的,你可以在叁个线程中往往调用acquire(),在线程具有锁时它连接回到true。

你不应当在四个线程中用同一个InterProcessMutex
你能够在种种线程中都生成叁个新的InterProcessMutex实例,它们的path都一样,那样它们能够分享同一个锁。

遍及式锁

提醒:

1.引入使用ConnectionStateListener监察和控制连接的境况,因为当连接LOST时你不再具备锁

2.布满式的锁全局同步,
那象征任何贰个光阴点不会有多个客户端都具备同样的锁。

不足重入分享锁—Shared Lock

其一锁和方面包车型客车InterProcessMutex相比较,便是少了Reentrant的效果,也就象征它无法在同三个线程中重入。那么些类是InterProcessSemaphoreMutex,使用办法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行后发掘,有且只有一个client成功获取第多少个锁(第贰个acquire()艺术重临true),然后它本身过不去在其次个acquire()主意,获取第4个锁超时;其余兼具的客户端都阻塞在首先个acquire()格局超时况兼抛出极其。

这么也就申明了InterProcessSemaphoreMutex贯彻的锁是不行重入的。

可重入分享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都能够央浼锁。
Reentrant和JDK的ReentrantLock类似,就能够重入,
意味着同多少个客户端在颇具锁的同期,能够每每取得,不会被打断。
它是由类InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()得到锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()艺术释放锁。 InterProcessMutex 实例能够选取。

Revoking ZooKeeper recipes wiki定义了可研讨的吊销机制。
为了撤废mutex, 调用下边包车型地铁主意:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假诺你诉求裁撤当前的锁,
调用attemptRevoke()主意,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

贰回提示:错误管理
还是生硬推荐你选取ConnectionStateListener拍卖连接景况的变动。
当连接LOST时您不再抱有锁。

第一让我们成立三个仿照的分享财富,
那些财富期望只好单线程的拜访,不然会有出现难题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创立四个InterProcessMutexDemo类, 它担负需要锁,
使用财富,释放锁那样四个完整的拜望进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简单,生成拾三个client, 每种client重复实行拾一遍需求锁–访问财富–释放锁的进程。各个client都在单身的线程中。
结果能够见到,锁是大肆的被各样实例排他性的选拔。

既是是可选拔的,你可以在多个线程中往往调用acquire(),在线程具有锁时它连接回到true。

你不该在八个线程中用同三个InterProcessMutex
你能够在各类线程中都生成二个新的InterProcessMutex实例,它们的path都同样,那样它们得以分享同一个锁。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。四个读写枪乌贼理一对有关的锁。三个担负读操作,别的二个负责写操作。读操作在写锁没被应用时可同时由三个进度使用,而写锁在使用时不允许读(阻塞)。

此锁是可重入的。二个具备写锁的线程可重入读锁,不过读锁却不能够进入写锁。这也意味着写锁能够降级成读锁,
比方供给写锁 —>央浼读锁—>释放读锁
—->释放写锁
。从读锁晋级成写锁是特其他。

可重入读写锁首要由五个类完成:InterProcessReadWriteLockInterProcessMutex。使用时首先创制二个InterProcessReadWriteLock实例,然后再依据你的须求得到读锁大概写锁,读写锁的花色是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

不可重入分享锁—Shared Lock

本条锁和方面包车型客车InterProcessMutex相比较之下,正是少了Reentrant的成效,也就代表它不可能在同三个线程中重入。那么些类是InterProcessSemaphoreMutex,使用办法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运维后发觉,有且唯有多少个client成功获得第二个锁(第四个acquire()艺术重临true),然后它和煦过不去在其次个acquire()办法,获取第1个锁超时;其余全部的客户端都阻塞在首先个acquire()方式超时而且抛出十三分。

这么也就印证了InterProcessSemaphoreMutex福寿双全的锁是不可重入的。

信号量—Shared Semaphore

贰个计数的数字信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组承认(permits),而Curator中称之为租约(Lease)。
有二种办法能够垄断semaphore的最大租约数。第一种艺术是用户给定path并且钦点最大LeaseSize。第二种方法用户给定path何况动用SharedCountReader类。比如不选拔SharedCountReader,
必须确认保证具有实例在多进度中利用同样的(最大)租约数量,不然有希望出现A进度中的实例持有最大租约数量为10,但是在B进程中兼有的最大租约数量为20,此时租约的含义就失效了。

此次调用acquire()会回来八个租约对象。
客户端必须在finally中close这么些租约对象,不然这个租约会错过掉。 然则,
然而,如若客户端session由于某种原因举例crash遗弃,
那么那几个客户端持有的租约会自动close,
那样任何客户端能够持续行使那个租约。 租约还足以由此下边包车型大巴方法返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

注意你能够一回性必要七个租约,假若Semaphore当前的租约远远不足,则央浼线程会被卡住。
同偶尔间还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的机要类包涵下边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

首先大家先获得了5个租约, 最后我们把它还给了semaphore。
接着诉求了三个租约,因为semaphore还或然有5个租约,所以恳请能够满意,重返贰个租约,还剩4个租约。
然后再需要多个租约,因为租约非常不足,卡住到过期,依旧没能知足,再次来到结果为null(租约不足会阻塞到过期,然后回来null,不会积极性抛出特别;假诺不安装超时时间,会雷同阻塞)。

地方说讲的锁都以正义锁(fair)。 总ZooKeeper的角度看,
每一个客户端都依据诉求的逐一得到锁,不设有非公平的并吞的情景。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写孝鱼理一对相关的锁。叁个承担读操作,另外八个承受写操作。读操作在写锁没被利用时可同一时候由多少个进程使用,而写锁在采纳时不允许读(阻塞)。

此锁是可重入的。多个全体写锁的线程可重入读锁,但是读锁却不能够进来写锁。这也象征写锁可以降级成读锁,
举例哀告写锁 —>须要读锁—>释放读锁
—->释放写锁
。从读锁晋级成写锁是可怜的。

可重入读写锁首要由七个类实现:InterProcessReadWriteLockInterProcessMutex。使用时首先成立一个InterProcessReadWriteLock实例,然后再依据你的须要得到读锁可能写锁,读写锁的体系是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是叁个锁的容器。 当调用acquire()
全体的锁都会被acquire(),倘诺央求败北,全数的锁都会被release。
同样调用release时持有的锁都被release(波折被忽视)。
基本上,它正是组锁的表示,在它下面的央求释放操作都会传递给它含有的兼具的锁。

驷不比舌涉嫌多个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数必要富含的锁的成团,可能一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建三个InterProcessMultiLock, 包括叁个重入锁和三个非重入锁。
调用acquire()后可以见见线程同有时间兼有了那四个锁。
调用release()看样子那三个锁都被放出了。

谈到底再重复一遍,
刚烈推荐使用ConnectionStateListener监察和控制连接的图景,当连接意况为LOST,锁将会放任。

信号量—Shared Semaphore

三个计数的频域信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组承认(permits),而Curator中称之为租约(Lease)。
有两种格局得以调控semaphore的最大租约数。第一种方法是用户给定path何况钦点最大LeaseSize。第三种办法用户给定path並且选拔SharedCountReader类。假定不利用SharedCountReader,
必须确认保证全数实例在多进度中使用同样的(最大)租约数量,不然有相当的大大概出现A进度中的实例持有最大租约数量为10,不过在B进程中有所的最大租约数量为20,此时租约的意思就失效了。

这一次调用acquire()会回到三个租约对象。
客户端必须在finally中close那个租约对象,否则这个租约会错失掉。 不过,
可是,尽管客户端session由于某种原因譬如crash放任,
那么这一个客户端持有的租约会自动close,
那样任何客户端能够承袭应用那几个租约。 租约仍是能够通过上边包车型地铁措施返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

瞩目你能够贰遍性须求三个租约,如若Semaphore当前的租约缺乏,则呼吁线程会被卡住。
同一时间还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的首要类满含下边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

    public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";
    
    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
    
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
    
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");
    
            resource.use();
    
            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);
    
            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
    

    }

先是我们先获得了5个租约, 最终我们把它还给了semaphore。
接着央浼了一个租约,因为semaphore还应该有5个租约,所以恳请能够满足,重临一个租约,还剩4个租约。
然后再央浼二个租约,因为租约非常不够,卡住到过期,依然未能满意,再次回到结果为null(租约不足会阻塞到过期,然后回来null,不会积极性抛出拾贰分;若是不安装超时时间,会雷同阻塞)。

上面说讲的锁都以比量齐观锁(fair)。 总ZooKeeper的角度看,
每一个客户端都依据伏乞的顺序得到锁,不设有非公平的并吞的处境。

布满式计数器

一面之识,计数器是用来计数的,
利用ZooKeeper能够兑现二个集群共享的计数器。
只要使用一样的path就能够赢得最新的计数器值,
那是由ZooKeeper的一致性有限支撑的。Curator有四个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

多分享锁对象 —Multi Shared Lock

Multi Shared Lock是三个锁的容器。 当调用acquire()
全数的锁都会被acquire(),即便央浼失败,全数的锁都会被release。
一样调用release时具备的锁都被release(失利被忽视)。
基本上,它正是组锁的代表,在它上边包车型地铁呼吁释放操作都会传送给它涵盖的保有的锁。

重大涉嫌八个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数供给包括的锁的集纳,只怕一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

 

新建四个InterProcessMultiLock, 包罗一个重入锁和三个非重入锁。
调用acquire()后方可看出线程同一时候具备了那三个锁。
调用release()看看那八个锁都被放飞了。

最后再重蹈覆辙三遍,
刚毅推荐使用ConnectionStateListener监察和控制连接的场所,当连接情状为LOST,锁将会丢弃。

分布式int计数器—SharedCount

那几个类应用int类型来计数。 重要涉嫌多个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可感到它增添一个SharedCountListener,当计数器改动时此Listener能够监听到改换的平地风波,而SharedCountReader可以读取到最新的值,
包含字面值和带版本新闻的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这几个事例中,大家利用baseCount来监听计数值(addListener主意来增添SharedCountListener
)。 大肆的SharedCount, 只要使用同样的path,都能够获取那几个计数值。
然后我们运用5个线程为计数值扩张三个10以内的随机数。一样的path的SharedCount对计数值实行更动,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此处大家采取trySetCount去设置计数器。
首先个参数提供当前的VersionedValue,假诺中间别的client更新了此计数值,
你的创新恐怕不成事,
然则此时你的client更新了新式的值,所以退步了你能够品味再更新二次。
setCount是挟持更新计数器的值

在意计数器必须start,使用完事后必须调用close关闭它。

刚毅推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

遍布式计数器

看名称就能够想到其意义,计数器是用来计数的,
利用ZooKeeper能够完毕一个集群分享的计数器。
只要利用同样的path就足以拿走最新的计数器值,
那是由ZooKeeper的一致性保险的。Curator有八个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式long计数器—DistributedAtomicLong

再看三个Long类型的计数器。 除了计数的界定比SharedCount大了之外,
它首先尝试采取乐观锁的章程设置计数器,
纵然不成功(比方时期计数器已经被其他client更新了),
它使用InterProcessMutex艺术来更新计数值。

能够从它的当中贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一层层的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省再次来到结果的succeeded(), 它代表此操作是还是不是中标。
假使操作成功, preValue()表示操作前的值,
postValue()意味着操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式int计数器—SharedCount

其一类应用int类型来计数。 首要涉及多少个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器,
可感觉它扩大四个SharedCountListener,当计数器改造时此Listener能够监听到改动的平地风波,而SharedCountReader能够读取到最新的值,
满含字面值和带版本消息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在那几个事例中,大家采纳baseCount来监听计数值(addListener主意来增添SharedCountListener
)。 大肆的SharedCount, 只要使用一样的path,都得以收获这一个计数值。
然后我们接纳5个线程为计数值扩充多个10以内的随机数。同样的path的SharedCount对计数值举行改换,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

这里大家运用trySetCount去设置计数器。
率先个参数提供当前的VersionedValue,要是时期其余client更新了此计数值,
你的换代或然不成事,
但是此时你的client更新了新式的值,所以失利了您能够品味再更新贰遍。
setCount是挟持更新计数器的值

在意计数器必须start,使用完事后必须调用close关闭它。

刚烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

布满式队列

选取Curator也能够简化Ephemeral Node
(一时半刻节点)的操作。Curator也提供ZK Recipe的分布式队列完结。 利用ZK的
PE瑞鹰SISTENTS_EQUENTIAL节点,
能够确定保障放入到行列中的项目是比照顺序排队的。
如若纯粹的花费者从队列中取数据, 那么它是先入先出的,那也是队列的风味。
假若您严苛供给顺序,你就的应用单一的买主,能够利用Leader大选只让Leader作为独一的费用者。

不过, 依据Netflix的Curator笔者所说,
ZooKeeper真心不符合做Queue,恐怕说ZK没有兑现一个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实施中ZNode必须相对不大,而队列饱含众多的音信,相当的大。
  2. 如果有非常多节点,ZK运转时比非常的慢。 而使用queue会导致众多ZNode.
    你须要显明增大 initLimit 和 syncLimit.
  3. ZNode十分的大的时候很难清理。Netflix不得不成立了贰个特地的次序做这事。
  4. 当非常的大气的涵盖众多的子节点的ZNode时, ZK的天性变得不好
  5. ZK的数据库完全放在内部存储器中。 大批量的Queue意味着会占用非常多的内存空间。

固然, Curator依旧创制了种种Queue的落到实处。
纵然Queue的数据量不太多,数据量不太大的景况下,酌情思量,仍是可以应用的。

分布式long计数器—DistributedAtomicLong

再看三个Long类型的计数器。 除了计数的限量比SharedCount大了之外,
它首先尝试使用乐观锁的法门设置计数器,
要是不成事(比如时期计数器已经被别的client更新了),
它利用InterProcessMutex措施来更新计数值。

能够从它的内部贯彻DistributedAtomicValue.trySet()中看出:

 AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一密密麻麻的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须自己谈论重返结果的succeeded(), 它象征此操作是不是中标。
若是操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

遍及式队列—DistributedQueue

DistributedQueue是最常见的一种队列。 它设计以下三个类:

  • QueueBuilder – 创立队列使用QueueBuilder,它也是别的队列的始建类
  • QueueConsumer – 队列中的新闻成本者接口
  • QueueSerializer –
    队列消息体系化和反体系化接口,提供了对队列中的对象的连串化和反类别化
  • DistributedQueue – 队列达成类

QueueConsumer是花费者,它能够接收队列的多少。处理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。

健康景况下先将音讯从队列中移除,再交给花费者花费。但那是五个步骤,不是原子的。可以调用Builder的lockPath()开支者加锁,当顾客开销数据时有所锁,那样任何花费者不能源消成本此音讯。尽管花费战败只怕经过死掉,音讯能够交到其余进程。那会推动或多或少性质的损失。最棒照旧单开销者方式采取队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了四个遍及式队列和五个买主,因为PATH是一样的,会存在花费者抢占花费音讯的景色。

遍及式队列

运用Curator也足以简化Ephemeral Node
(有时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的
PE奥迪Q5SISTENTS_EQUENTIAL节点,
可以确认保障放入到行列中的项目是依据顺序排队的。
假若单纯的顾客从队列中取数据, 那么它是先入先出的,那也是队列的个性。
如若你严俊须求顺序,你就的选拔单一的花费者,能够运用Leader公投只让Leader作为独一的顾客。

然则, 根据Netflix的Curator笔者所说,
ZooKeeper真心不符合做Queue,或许说ZK未有完成三个好的Queue,详细内容能够看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    施行中ZNode必须相对很小,而队列包蕴众多的音信,一点都不小。
  2. 即使有好多节点,ZK运营时极快。 而使用queue会导致不知凡几ZNode.
    你须要明显增大 initLimit 和 syncLimit.
  3. ZNode比十分的大的时候很难清理。Netflix不得不创设了三个挑升的次序做那事。
  4. 当十分的大气的隐含众多的子节点的ZNode时, ZK的性质变得倒霉
  5. ZK的数据库完全放在内部存款和储蓄器中。 大量的Queue意味着会据有相当多的内部存款和储蓄器空间。

即便, Curator依然成立了各个Queue的兑现。
倘若Queue的数据量不太多,数据量不太大的景况下,酌情思考,照旧得以选拔的。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地点的系列类似,只是足感觉队列中的每一种成分设置二个ID
可以透过ID把队列中随机的因素移除。 它涉及多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过上边方法创制:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在那么些事例中,
有些成分还未曾被开销者耗费前就移除了,那样顾客不会接受删除的新闻。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式队列—DistributedQueue

DistributedQueue是最常见的一种队列。 它安顿以下多个类:

  • QueueBuilder – 成立队列使用QueueBuilder,它也是任何队列的创设类
  • QueueConsumer – 队列中的音讯费用者接口
  • QueueSerializer –
    队列新闻连串化和反类别化接口,提供了对队列中的对象的体系化和反连串化
  • DistributedQueue – 队列完成类

QueueConsumer是花费者,它能够接收队列的多少。管理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。

健康状态下先将新闻从队列中移除,再交给成本者费用。但那是两个步骤,不是原子的。能够调用Builder的lockPath()花费者加锁,当顾客花费数量时有所锁,那样任何费用者不能花费此音讯。假若消费失利或许经过死掉,新闻能够交给别的进度。那会推动或多或少属性的损失。最佳依旧单开支者格局选拔队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了八个布满式队列和多少个顾客,因为PATH是完全一样的,会设有花费者抢占花费新闻的景观。

预先级分布式队列—DistributedPriorityQueue

事先级队列对队列中的元素根据事先级实行排序。 Priority越小,
成分越靠前, 越先被花费掉
。 它关系上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

透过builder.buildPriorityQueue(minItemsBeforeRefresh)方法成立。
当优先级队列获得成分增加和删除音信时,它会中断处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh钦定刷新前当前活动的体系的一丁点儿数量。
主要安装你的次序能够容忍的不排序的比非常小值。

归入队列时索要钦定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

奇迹你大概会有错觉,优先级设置并不曾起效。那是因为事先级是对此队列积压的成分来讲,假设花费速度过快有很大希望出现在后贰个要素入队操作此前前贰个要素已经被花费,这种境况下DistributedPriorityQueue会退化为DistributedQueue。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和位置的系列类似,唯独足以为队列中的每二个成分设置多个ID
能够通过ID把队列中自由的因素移除。 它关系多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过上边方法创建:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在这几个事例中,
有些成分还平素不被成本者成本前就移除了,那样顾客不会接收删除的新闻。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式延迟队列—DistributedDelayQueue

JDK中也可能有DelayQueue,不明白您是还是不是纯熟。
DistributedDelayQueue也提供了类似的功用, 成分有个delay值,
开支者隔一段时间能力选拔元素。 涉及到上边八个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经过下边包车型客车口舌创立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时方可钦赐delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的三个时间间隔,
举例20纳秒,而是今后的几个时刻戳,如 System.current提姆eMillis() + 10秒。
假诺delayUntilEpoch的大运已经去世,新闻会应声被花费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

早期级布满式队列—DistributedPriorityQueue

优先级队列对队列中的成分依据事先级举行排序。 Priority越小,
成分越靠前, 越先被成本掉
。 它关系下边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

由此builder.buildPriorityQueue(minItemsBeforeRefresh)方法创制。
当优先级队列得到成分增加和删除新闻时,它会停顿管理当下的要素队列,然后刷新队列。minItemsBeforeRefresh钦定刷新前当前活动的连串的小小数量。
首要安装你的顺序可以忍受的不排序的微小值。

放入队列时要求钦赐优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

 

不经常你恐怕会有错觉,优先级设置并不曾起效。这是因为事先级是对于队列积压的成分来讲,固然花费速度过快有望出现在后一个要素入队操作从前前三个要素已经被花费,这种情形下DistributedPriorityQueue会退化为DistributedQueue。

SimpleDistributedQueue

前方就算完结了种种队列,不过你注意到未有,这几个队列并从未落实类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(不过并未有完毕Queue接口)。
创制很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

追法郎素:

public boolean offer(byte[] data) throws Exception

除去成分:

public byte[] take() throws Exception

此外还提供了别的措施:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take方法在成功重回以前会被打断。
poll措施在队列为空时直接重临null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

而是事实上发送了100条音讯,花费完第一条之后,后边的消息不可能花费,目前没找到原因。查看一下法定文书档案推荐的demo使用上边几个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

可是事实上运用开掘依旧存在花费阻塞难点。

分布式延迟队列—DistributedDelayQueue

JDK中也许有DelayQueue,不清楚您是或不是了解。
DistributedDelayQueue也提供了近似的效果与利益, 成分有个delay值,
花费者隔一段时间技术吸收接纳成分。 涉及到上边八个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经过上边包车型的士言语制造:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时得以钦赐delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个时日间隔,
举例20飞秒,而是现在的八个日子戳,如 System.current提姆eMillis() + 10秒。
借使delayUntilEpoch的时刻已经驾鹤归西,新闻会即时被花费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

分布式屏障—Barrier

遍及式Barrier是这样四个类:
它会阻塞全部节点上的守候历程,直到某一个被满意,
然后有所的节点继继续展览开。

例如赛马竞技后, 等赛马时有时无赶来起跑线前。
一声令下,全体的赛马都飞奔而出。

SimpleDistributedQueue

方今即使完成了各类队列,可是你注意到未有,这几个队列并未有达成类似JDK同样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是从未兑现Queue接口)。
成立异常的粗略:

public SimpleDistributedQueue(CuratorFramework client,String path)

日增成分:

public boolean offer(byte[] data) throws Exception

去除元素:

public byte[] take() throws Exception

其它还提供了另外措施:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take措施在功成名就重回以前会被封堵。
poll方法在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

唯独其实发送了100条音信,花费完第一条之后,前边的音信十分的小概花费,如今没找到原因。查看一下法定文书档案推荐的demo使用上面多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

可是事实上行使发掘如故存在花费阻塞难点。

DistributedBarrier

DistributedBarrier类达成了栅栏的作用。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

率先你要求安装栅栏,它将阻塞在它上边等待的线程:

setBarrier();

下一场必要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当准则满意时,移除栅栏,全数等待的线程将继续实施:

removeBarrier();

非凡管理 DistributedBarrier
会监察和控制连接情形,当连接断掉时waitOnBarrier()方法会抛出十一分。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

其一例子创制了controlBarrier来安装栅栏和移除栅栏。
我们创制了5个线程,在此Barrier上等待。
最终移除栅栏后具备的线程才继续实行。

假如您从头不安装栅栏,全数的线程就不会阻塞住。

遍及式屏障—Barrier

布满式Barrier是那样三个类:
它会阻塞全体节点上的守候历程,直到某两个被满足,
然后具有的节点继续张开。

例如说赛马比赛中, 等赛马陆陆续续赶到起跑线前。
一声令下,全部的跑马都飞奔而出。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在计算的起来和结束时手拉手。当丰裕的进度步向到双栅栏时,进度起始臆想,
当总括完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()措施被调用时,成员被封堵,直到全部的积极分子都调用了enter()
leave()办法被调用时,它也短路调用线程,直到全体的积极分子都调用了leave()
仿佛百米赛跑竞赛, 发令枪响,
全数的选手早先跑,等具备的选手跑过巅峰线,竞赛才甘休。

DistributedDoubleBarrier会监察和控制连接景况,当连接断掉时enter()leave()方法会抛出十分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参谋资料:
《从PAXOS到ZOOKEEPEPAJERO分布式一致性原理与实行》
《 跟着实例学习ZooKeeper的用法》博客类别

类型旅舍:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较方便导航到各样章节,只是简书不支持,本文的MD原来的文章放在项目标/resources/md目录下,有爱自取,文章用Typora编写,提议用Typora张开)

End on 2017-5-13 13:10.
Help yourselves!
自家是throwable,在都柏林努力,白天上班,早晨和双休不定期加班,早晨空闲百折不回写下博客。
期望笔者的篇章能够给你带来收获,共勉。

DistributedBarrier

DistributedBarrier类完毕了栅栏的功效。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你须求安装栅栏,它将卡住在它上边等待的线程:

setBarrier();

接下来须要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,全数等待的线程将继续推行:

removeBarrier();

不行管理 DistributedBarrier
会监察和控制连接情状,当连接断掉时waitOnBarrier()方法会抛出特别。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

这几个事例创造了controlBarrier来设置栅栏和移除栅栏。
大家创造了5个线程,在此巴里r上等待。
最终移除栅栏后全体的线程才继续实施。

假诺你起来不安装栅栏,全体的线程就不会阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在总计的启幕和告竣作时间一并。当丰盛的历程步向到双栅栏时,进度始起揣度,
当计算完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()办法被调用时,成员被卡住,直到全部的积极分子都调用了enter()
leave()格局被调用时,它也短路调用线程,直到全体的成员都调用了leave()
就像百米赛跑竞技, 发令枪响,
全部的选手初叶跑,等富有的健儿跑过巅峰线,竞技才结束。

DistributedDoubleBarrier会监察和控制连接处境,当连接断掉时enter()leave()方法会抛出十二分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参谋资料:
《从PAXOS到ZOOKEEPEEscort遍及式一致性原理与施行》
《 跟着实例学习ZooKeeper的用法》博客连串

品类旅馆:
https://github.com/zjcscut/curator-seed

作者:zhrowable
链接:https://www.jianshu.com/p/70151fc0ef5d
來源:简书
简书文章权归小编全体,任何情势的转发都请联系作者获得授权并申明出处。

相关文章