zookeeper 客户端命令客户端有什么用

zookeeper原理(转) - 翻过这座山,就到菩提洞了 - ITeye技术网站
博客分类:
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,因此需要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。本文简单分析zookeeper的工作原理,对于如何使用zookeeper不是本文讨论的重点。
1 Zookeeper的基本概念
Zookeeper中的角色主要有以下三类,如下表所示:
系统模型如图所示:
1.2 设计目的
1.最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。
2 .可靠性:具有简单、健壮、良好的性能,如果消息m被到一台服务器接受,那么它将被所有的服务器接受。
3 .实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。
4 .等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。
5.原子性:更新只能成功或者失败,没有中间状态。
6 .顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
2 ZooKeeper的工作原理
Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。
为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。
每个Server在工作过程中有三种状态:
LOOKING:当前Server不知道leader是谁,正在搜寻
LEADING:当前Server即为选举出来的leader
FOLLOWING:leader已经选举出来,当前Server与之同步
2.1 选主流程
当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另外一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。先介绍basic paxos流程:
1 .选举线程由当前Server发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的Server;
2 .选举线程首先向所有Server发起一次询问(包括自己);
3 .选举线程收到回复后,验证是否是自己发起的询问(验证zxid是否一致),然后获取对方的id(myid),并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中;
收到所有Server回复以后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server;
线程将当前zxid最大的Server设置为当前Server要推荐的Leader,如果此时获胜的Server获得n/2 + 1的Server票数, 设置当前推荐的leader为获胜的Server,将根据获胜的Server相关信息设置自己的状态,否则,继续这个过程,直到leader被选举出来。
通过流程分析我们可以得出:要使Leader获得多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于n+1.
每个Server启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的server还会从磁盘快照中恢复数据和会话信息,zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。选主的具体流程图如下所示:
fast paxos流程是在选举过程中,某Server首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决epoch和zxid的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息,重复这个流程,最后一定能选举出Leader。其流程图如下所示:
2.2 同步流程
选完leader以后,zk就进入状态同步过程。
1. leader等待server连接;
2 .Follower连接leader,将最大的zxid发送给leader;
3 .Leader根据follower的zxid确定同步点;
4 .完成同步后通知follower 已经成为uptodate状态;
5 .Follower收到uptodate消息后,又可以重新接受client的请求进行服务了。
流程图如下所示:
2.3 工作流程
2.3.1 Leader工作流程
Leader主要有三个功能:
1 .恢复数据;
2 .维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型;
3 .Learner的消息类型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根据不同的消息类型,进行不同的处理。
PING消息是指Learner的心跳信息;REQUEST消息是Follower发送的提议信息,包括写请求及同步请求;ACK消息是Follower的对提议的回复,超过半数的Follower通过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间。Leader的工作流程简图如下所示,在实际实现中,流程要比下图复杂得多,启动了三个线程来实现功能。
2.3.2 Follower工作流程
Follower主要有四个功能:
1. 向Leader发送请求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);
2 .接收Leader消息并进行处理;
3 .接收Client的请求,如果为写请求,发送给Leader进行投票;
4 .返回Client结果。
Follower的消息循环处理如下几种来自Leader的消息:
1 .PING消息: 心跳消息;
2 .PROPOSAL消息:Leader发起的提案,要求Follower投票;
3 .COMMIT消息:服务器端最新一次提案的信息;
4 .UPTODATE消息:表明同步完成;
5 .REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session还是允许其接受消息;
6 .SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制得到最新的更新。
Follower的工作流程简图如下所示,在实际实现中,Follower是通过5个线程来实现功能的。
对于observer的流程不再叙述,observer流程和Follower的唯一不同的地方就是observer不会参加leader发起的投票。
主流应用场景:
Zookeeper的主流应用场景实现思路(除去官方示例) (1)配置管理集中式的配置管理在应用集群中是非常常见的,一般商业公司内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自配置的需求,并且在配置变更时能够通知到集群中的每一个机器。Zookeeper很容易实现这种集中式的配置管理,比如将APP1的所有配置配置到/APP1 znode下,APP1所有机器一启动就对/APP1这个节点进行监控(zk.exist("/APP1",true)),并且实现回调方法Watcher,那么在zookeeper上/APP1 znode节点下数据发生变化的时候,每个机器都会收到通知,Watcher方法将会被执行,那么应用再取下数据即可(zk.getData("/APP1",false,null));以上这个例子只是简单的粗颗粒度配置监控,细颗粒度的数据可以进行分层级监控,这一切都是可以设计和控制的。
(2)集群管理 应用集群中,我们常常需要让每一个机器知道集群中(或依赖的其他某一个集群)哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器。Zookeeper同样很容易实现这个功能,比如我在zookeeper服务器端有一个znode叫/APP1SERVERS,那么集群中每一个机器启动的时候都去这个节点下创建一个EPHEMERAL类型的节点,比如server1创建/APP1SERVERS/SERVER1(可以使用ip,保证不重复),server2创建/APP1SERVERS/SERVER2,然后SERVER1和SERVER2都watch /APP1SERVERS这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。因为EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端连接断掉或者session过期就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消失,然后集群中所有对/APP1SERVERS进行watch的客户端都会收到通知,然后取得最新列表即可。另外有一个应用场景就是集群选master,一旦master挂掉能够马上能从slave中选出一个master,实现步骤和前者一样,只是机器在启动的时候在APP1SERVERS创建的节点类型变为EPHEMERAL_SEQUENTIAL类型,这样每个节点会自动被编号我们默认规定编号最小的为master,所以当我们对/APP1SERVERS节点做监控的时候,得到服务器列表,只要所有集群机器逻辑认为最小编号节点为master,那么master就被选出,而这个master宕机的时候,相应的znode会消失,然后新的服务器列表就被推送到客户端,然后每个节点逻辑认为最小编号节点为master,这样就做到动态master选举。
Zookeeper 监视(Watches) 简介
Zookeeper C API 的声明和描述在 include/zookeeper.h 中可以找到,另外大部分的 Zookeeper C API 常量、结构体声明也在 zookeeper.h 中,如果如果你在使用 C API 是遇到不明白的地方,最好看看 zookeeper.h,或者自己使用 doxygen 生成 Zookeeper C API 的帮助文档。
Zookeeper 中最有特色且最不容易理解的是监视(Watches)。Zookeeper 所有的读操作——getData(), getChildren(), 和 exists() 都 可以设置监视(watch),监视事件可以理解为一次性的触发器, 官方定义如下: a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes。对此需要作出如下理解:
(一次性触发)One-time trigger
当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了 getData("/znode1", true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。
(发送至客户端)Sent to the client
Zookeeper 客户端和服务端是通过 socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。
(被设置 watch 的数据)The data for which the watch was set
这意味着 znode 节点本身具有不同的改变方式。你也可以想象 Zookeeper 维护了两条监视链表:数据监视和子节点监视(data watches and child watches) getData() and exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。因此, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。
Zookeeper 中的监视是轻量级的,因此容易设置、维护和分发。当客户端与 Zookeeper 服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说 这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过 exists() 设置了某个 znode 节点的监视,但是如果某个客户端在此 znode 节点被创建和删除的时间间隔内与 zookeeper 服务器失去了联系,该客户端即使稍后重新连接 zookeeper服务器后也得不到事件通知。
Zookeeper C API 常量与部分结构(struct)介绍
与 ACL 相关的结构与常量:
struct Id 结构为:
struct Id {
struct ACL 结构为:
struct ACL {
struct I };
struct ACL_vector 结构为:
struct ACL_vector {
struct ACL * };
与 znode 访问权限有关的常量
const int ZOO_PERM_READ; //允许客户端读取 znode 节点的值以及子节点列表。
const int ZOO_PERM_WRITE;// 允许客户端设置 znode 节点的值。
const int ZOO_PERM_CREATE; //允许客户端在该 znode 节点下创建子节点。
const int ZOO_PERM_DELETE;//允许客户端删除子节点。
const int ZOO_PERM_ADMIN; //允许客户端执行 set_acl()。
const int ZOO_PERM_ALL;//允许客户端执行所有操作,等价与上述所有标志的或(OR) 。
与 ACL IDs 相关的常量
struct Id ZOO_ANYONE_ID_UNSAFE; //(‘world’,’anyone’)
struct Id ZOO_AUTH_IDS;// (‘auth’,’’)
三种标准的 ACL
struct ACL_vector ZOO_OPEN_ACL_UNSAFE; //(ZOO_PERM_ALL,ZOO_ANYONE_ID_UNSAFE)
struct ACL_vector ZOO_READ_ACL_UNSAFE;// (ZOO_PERM_READ, ZOO_ANYONE_ID_UNSAFE)
struct ACL_vector ZOO_CREATOR_ALL_ACL; //(ZOO_PERM_ALL,ZOO_AUTH_IDS)
与 Interest 相关的常量:ZOOKEEPER_WRITE, ZOOKEEPER_READ
这 两个常量用于标识感兴趣的事件并通知 zookeeper 发生了哪些事件。Interest 常量可以进行组合或(OR)来标识多种兴趣(multiple interests: write, read),这两个常量一般用于 zookeeper_interest() 和 zookeeper_process()两个函数中。
与节点创建相关的常量:ZOO_EPHEMERAL, ZOO_SEQUENCE
zoo_create 函数标志,ZOO_EPHEMERAL 用来标识创建临时节点,ZOO_SEQUENCE 用来标识节点命名具有递增的后缀序号(一般是节点名称后填充 10 位字符的序号,如 /xyz, /xyz, /xyz, ...),同样地,ZOO_EPHEMERAL, ZOO_SEQUENCE 可以组合。
与连接状态 Stat 相关的常量
以下常量均与 Zookeeper 连接状态有关,他们通常用作监视器回调函数的参数。
ZOOAPI const int
ZOO_EXPIRED_SESSION_STATE
ZOOAPI const int
ZOO_AUTH_FAILED_STATE
ZOOAPI const int
ZOO_CONNECTING_STATE
ZOOAPI const int
ZOO_ASSOCIATING_STATE
ZOOAPI const int
ZOO_CONNECTED_STATE
与监视类型(Watch Types)相关的常量
以下常量标识监视事件的类型,他们通常用作监视器回调函数的第一个参数。
Zookeeper C API 错误码介绍
ZSYSTEMERROR
系统或服务器端错误(System and server-side errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,即大于该错误值,且小于 ZAPIERROR 都是系统错误。
ZRUNTIMEINCONSISTENCY
运行时非一致性错误。
ZDATAINCONSISTENCY
数据非一致性错误。
ZCONNECTIONLOSS
Zookeeper 客户端与服务器端失去连接
ZMARSHALLINGERROR
在 marshalling 和 unmarshalling 数据时出现错误(Error while marshalling or unmarshalling data)
ZUNIMPLEMENTED
该操作未实现(Operation is unimplemented)
ZOPERATIONTIMEOUT
该操作超时(Operation timeout)
ZBADARGUMENTS
非法参数错误(Invalid arguments)
ZINVALIDSTATE
非法句柄状态(Invliad zhandle state)
API 错误(API errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,错误值大于该值的标识 API 错误,而小于该值的标识 ZSYSTEMERROR。
节点不存在(Node does not exist)
没有经过授权(Not authenticated)
ZBADVERSION
版本冲突(Version conflict)
ZNOCHILDRENFOREPHEMERALS
临时节点不能拥有子节点(Ephemeral nodes may not have children)
ZNODEEXISTS
节点已经存在(The node already exists)
该节点具有自身的子节点(The node has children)
ZSESSIONEXPIRED
会话过期(The session has been expired by the server)
ZINVALIDCALLBACK
非法的回调函数(Invalid callback specified)
ZINVALIDACL
非法的ACL(Invalid ACL specified)
ZAUTHFAILED
客户端授权失败(Client authentication failed)
Zookeeper 连接关闭(ZooKeeper is closing)
并非错误,客户端不需要处理服务器的响应(not error, no server responses to process)
ZSESSIONMOVED
会话转移至其他服务器,所以操作被忽略(session moved to another server, so operation is ignored)
Watch事件类型:
ZOO_CREATED_EVENT:节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过zoo_exists()设置ZOO_DELETED_EVENT:节点删除事件,此watch通过zoo_exists()或zoo_get()设置ZOO_CHANGED_EVENT:节点数据改变事件,此watch通过zoo_exists()或zoo_get()设置ZOO_CHILD_EVENT:子节点列表改变事件,此watch通过zoo_get_children()或zoo_get_children2()设置ZOO_SESSION_EVENT:会话失效事件,客户端与服务端断开或重连时触发ZOO_NOTWATCHING_EVENT:watch移除事件,服务端出于某些原因不再为客户端watch节点时触发
浏览 323828
楼主只讲明了zookeeper的架构和大体的实现逻辑,但是缺少对paxos的说明,这个算法是zookeeper的核心,如果这个算法过程不说明,其实zookeeper就讲不清楚。兄台貌似更加理解zookeeper,能否出篇文章分享。
选主过程还是没有看懂看看paxos算法先,不然看这个zookeeper会非常难懂。其实原理简单来说,就是要选举leader,会生成一个zxid,然后分发给所有的server(所以这里一台server可以接受多台server给他发送要选举leader的请求),然后各个server根据发送给自己的zxid,选择一个值最大的,然后将这个选择返回给发送这个zxid的server,只要这个server收到的答复大于等于2/n+1个(也就是超过半数的同意票),则表明自己当选为leader,然后会向所有server广播自己已经成为leader。
对方提议的leader相关信息(id,zxid)请问是怎么提议的,收到那么多zxid,随机给一个么,那么选举线程为何不是随机自己选一个呢这个机制是通过fast paxos算法实现,具体如何提议,如何选择,建议先看看paxos这个算法https://zh.wikipedia.org/zh-cn/Paxos%E7%AE%97%E6%B3%95,维基百科讲的非常清楚。
[/img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][img][/url][/url][/url][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img][/img]
javafan_303
浏览: 530096 次
来自: 北京
&div class=&quote_title ...
楼主只讲明了zookeeper的架构和大体的实现逻辑,但是缺少 ...
&div class=&quote_title ...zookeeper c 客户端源码分析以及使用注意点
zookeeper的库用起来很麻烦,建议使用我封装的zookeeper c库,能考虑更少的zookeeper本身的逻辑
在上一篇文章查CLOSE_WAIT泄漏问题的时候稍微看了下zookeeper c源码,大致对他的流程来做一个分析。
zookeeper的C客户端分为mt库和st库(多线程和单线程),一般操作都是以多线程库为主。
多线程库分为三个线程,主线程,io线程和completion线程
主线程就是调用API的线程,io线程负责网络通信,对异步请求和watch响应等,IO线程会发给completion线程,由completion线程异步完成
首先看下zookeeper_init是如何创建io线程和completion线程的
zookeeper_init--&adaptor_init--&start_threads
void start_threads(zhandle_t* zh)
api_prolog(zh);
rc=pthread_create(&adaptor-&io, 0, do_io, zh);
rc=pthread_create(&adaptor-&completion, 0, do_completion, zh);
wait_for_others(zh);
api_epilog(zh, 0);
do_io就是io进程的主要逻辑了
void *do_io(void *v)
zhandle_t *zh = (zhandle_t*)v;
struct pollfd fds[2];
struct adaptor_threads *adaptor_threads = zh-&adaptor_priv;
api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG((&started IO thread&));
fds[0].fd=adaptor_threads-&self_pipe[0];
fds[0].events=POLLIN;
while(!zh-&close_requested) {
struct timeval tv;
int interest;
int timeout;
int maxfd=1;
zookeeper_interest(zh, &fd, &interest, &tv);
//将zookeeper_intereset里建立的新描述符加入监听事件里,睡眠一定的超时时间直到被唤醒
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads-&self_pipe[0],b,sizeof(b))==sizeof(b)){}
// dispatch zookeeper events
rc = zookeeper_process(zh, interest);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh))
api_epilog(zh, 0);
LOG_DEBUG((&IO thread terminated&));
可以看到,doio线程是一个使用poll多路复用的循环,主要是监视zh-&fd和adaptorthreads-&self_pipe[0]。
zh-&fd是和服务端通信的描述符,selfpipe是用来唤醒这个线程的(wakeupio_thread)
循环开始使用zookeeper_interest对服务端的网络进行检查,并且设置了poll中的等待时间
而后poll会进行睡眠等待被服务端或者主进程唤醒,唤醒后执行zookeeper_process来做具体的操作。
那么下面先看下zookeeper_interest的具体操作
do_io-&zookeeper_interest
int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
struct timeval *tv)
struct timeval now;
if(zh==0 || fd==0 ||interest==0 || tv==0)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
gettimeofday(&now, 0);
//如果存在deadline时间,那么打印超过了deadline时间多久
if(zh-&next_deadline.tv_sec!=0 || zh-&next_deadline.tv_usec!=0){
int time_left = calculate_interval(&zh-&next_deadline, &now);
if (time_left & 10)
LOG_WARN((&Exceeded deadline by %dms&, time_left));
//增加zh的引用计数,而后对变量进行初始化
api_prolog(zh);
*fd = zh-&fd;
*interest = 0;
tv-&tv_sec = 0;
tv-&tv_usec = 0;
//如果连接不存在
if (*fd == -1) {
if (zh-&connect_index == zh-&addrs_count) {
/* Wait a bit before trying again so that we don't spin */
zh-&connect_index = 0;
//重新建立连接,并且关nagle算法
int enable_tcp_nodelay = 1;
int ssoresult;
zh-&fd = socket(zh-&addrs[zh-&connect_index].ss_family, SOCK_STREAM, 0);
if (zh-&fd & 0) {
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZSYSTEMERROR, &socket() call failed&));
ssoresult = setsockopt(zh-&fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
if (ssoresult != 0) {
LOG_WARN((&Unable to set TCP_NODELAY, operation latency may be effected&));
fcntl(zh-&fd, F_SETFL, O_NONBLOCK|fcntl(zh-&fd, F_GETFL, 0));
rc = connect(zh-&fd, (struct sockaddr*) &zh-&addrs[zh-&connect_index], sizeof(struct sockaddr_in));
//因为是非阻塞socket,所以没连上的时候此时把状态设置为connecting
if (rc == -1) {
/* we are handling the non-blocking connect according to
* the description in section 16.3 &Non-blocking connect&
* in UNIX Network Programming vol 1, 3rd edition */
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
zh-&state = ZOO_CONNECTING_STATE;
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZCONNECTIONLOSS,&connect() call failed&));
//否则调用prime_connection,对zk服务端做一些握手验证等等,这个函数后面再看
if((rc=prime_connection(zh))!=0)
return api_epilog(zh,rc);
LOG_INFO((&Initiated connection to server [%s]&,
format_endpoint_info(&zh-&addrs[zh-&connect_index])));
//设置初始的poll等待时间为传入的recv_timeout的三分之一
*fd = zh-&fd;
*tv = get_timeval(zh-&recv_timeout/3);
zh-&last_recv = now;
zh-&last_send = now;
zh-&last_ping = now;
//如果连接建立
if (zh-&fd != -1) {
//计算有多久没有recv和send数据了
int idle_recv = calculate_interval(&zh-&last_recv, &now);
int idle_send = calculate_interval(&zh-&last_send, &now);
int recv_to = zh-&recv_timeout*2/3 - idle_recv;
int send_to = zh-&recv_timeout/3;
// have we exceeded the receive timeout threshold?
//如果空闲recv时间达到2/3*timeout,那么超时,关闭连接
if (recv_to &= 0) {
// We gotta cut our losses and connect to someone else
errno = ETIMEDOUT;
*interest=0;
*tv = get_timeval(0);
return api_epilog(zh,handle_socket_error_msg(zh,
__LINE__,ZOPERATIONTIMEOUT,
&connection to %s timed out (exceeded timeout by %dms)&,
format_endpoint_info(&zh-&addrs[zh-&connect_index]),
-recv_to));
//如果空闲send时间达到1/3*timeout
//那么发送ping(send_ping-&adaptor_send_queue-&flush_send_queue-&send_buffer-&zookeeper_send-&send)
// We only allow 1/3 of our timeout time to expire before sending
if (zh-&state==ZOO_CONNECTED_STATE) {
send_to = zh-&recv_timeout/3 - idle_send;
if (send_to &= 0) {
if (zh-&sent_requests.head==0) {
int rc=send_ping(zh);
if (rc & 0){
LOG_ERROR((&failed to send PING request (zk retcode=%d)&,rc));
return api_epilog(zh,rc);
send_to = zh-&recv_timeout/3;
// choose the lesser value as the timeout
//计算poll的等待时间next_deadline
*tv = get_timeval(recv_to & send_to? recv_to:send_to);
zh-&next_deadline.tv_sec = now.tv_sec + tv-&tv_sec;
zh-&next_deadline.tv_usec = now.tv_usec + tv-&tv_usec;
if (zh-&next_deadline.tv_usec & 1000000) {
zh-&next_deadline.tv_sec += zh-&next_deadline.tv_usec / 1000000;
zh-&next_deadline.tv_usec = zh-&next_deadline.tv_usec % 1000000;
//标识目前客户端对read事件(ping)感兴趣
*interest = ZOOKEEPER_READ;
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
//如果zh-&to_send有内容那么也关注write事件
if ((zh-&to_send.head && (zh-&state == ZOO_CONNECTED_STATE))
|| zh-&state == ZOO_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
return api_epilog(zh,ZOK);
可以看到zookeeper_interest决定了poll会睡眠多久,并且决定了zookeeper目前是对read还是write感兴趣
下面来看看zookeeper_process
zookeeperprocess中有个很重要的函数checkevents,先看看他
do_io-&zookeeper_process-&check_events
static int check_events(zhandle_t *zh, int events)
if (zh-&fd == -1)
return ZINVALIDSTATE;
//如果描述符触发写事件,并且状态为ZOO_CONNECTING_STATE状态,说明刚刚建立连接
if ((events&ZOOKEEPER_WRITE)&&(zh-&state == ZOO_CONNECTING_STATE)) {
int rc, error;
socklen_t len = sizeof(error);
rc = getsockopt(zh-&fd, SOL_SOCKET, SO_ERROR, &error, &len);
/* the description in section 16.4 &Non-blocking connect&
* in UNIX Network Programming vol 1, 3rd edition, points out
* that sometimes the error is in errno and sometimes in error */
if (rc & 0 || error) {
if (rc == 0)
errno = error;
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
&server refused to accept the client&);
//进入prime_connecttion
if((rc=prime_connection(zh))!=0)
return rc;
LOG_INFO((&initiated connection to server [%s]&,
format_endpoint_info(&zh-&addrs[zh-&connect_index])));
return ZOK;
连接建立后会进入prime_connection,看看这个函数
do_io-&zookeeper_process-&check_events-&prime_connection
static int prime_connection(zhandle_t *zh)
/*this is the size of buffer to serialize req into*/
char buffer_req[HANDSHAKE_REQ_SIZE];
int len = sizeof(buffer_req);
int hlen = 0;
struct connect_req req;
//设置协议req的version,sessionId,passwd等等
req.protocolVersion = 0;
req.sessionId = zh-&client_id.client_id;
req.passwd_len = sizeof(req.passwd);
memcpy(req.passwd, zh-&client_id.passwd, sizeof(zh-&client_id.passwd));
req.timeOut = zh-&recv_timeout;
req.lastZxidSeen = zh-&last_zxid;
hlen = htonl(len);
/* We are running fast and loose here, but this string should fit in the initial buffer! */
//发送协议头长度,以及整个包??这里为啥不是异步的,没看懂
rc=zookeeper_send(zh-&fd, &hlen, sizeof(len));
serialize_prime_connect(&req, buffer_req);
rc=rc&0 ? rc : zookeeper_send(zh-&fd, buffer_req, len);
if (rc&0) {
return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
&failed to send a handshake packet: %s&, strerror(errno));
//从CONNECTING状态转换为ASSOCIATING状态
zh-&state = ZOO_ASSOCIATING_STATE;
//将input_buffer标记为primer_buffer
zh-&input_buffer = &zh-&primer_buffer;
/* This seems a bit weird to to set the offset to 4, but we already have a
* length, so we skip reading the length (and allocating the buffer) by
* saying that we are already at offset 4 */
//设置已经当前input_buffer便宜为4
zh-&input_buffer-&curr_offset = 4;
return ZOK;
继续刚才的check_events函数
do_io-&zookeeper_process-&check_events
static int check_events(zhandle_t *zh, int events)
//如果to_send链表有内容,且触发事件为写事件,那么flush_send_queue,此时数据会被发出
if (zh-&to_send.head && (events&ZOOKEEPER_WRITE)) {
/* make the flush call non-blocking by specifying a 0 timeout */
int rc=flush_send_queue(zh,0);
if (rc & 0)
return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
&failed while flushing send queue&);
//如果触发事件为读
if (events&ZOOKEEPER_READ) {
if (zh-&input_buffer == 0) {
zh-&input_buffer = allocate_buffer(0,0);
//读取数据,读取一个文件头大小后申请响应的大小
rc = recv_buffer(zh-&fd, zh-&input_buffer);
if (rc & 0) {
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
&failed while receiving a server response&);
if (rc & 0) {
//如果input_buffer不为primer_buffer,说明不是刚建立的连接,那么把input_buffer的内容放入to_process链表等待被处理
gettimeofday(&zh-&last_recv, 0);
if (zh-&input_buffer != &zh-&primer_buffer) {
queue_buffer(&zh-&to_process, zh-&input_buffer, 0);
//如果是刚建立的连接
int64_t oldid,newid;
//deserialize
deserialize_prime_response(&zh-&primer_storage, zh-&primer_buffer.buffer);
/* We are processing the primer_buffer, so we need to finish
* the connection handshake */
oldid = zh-&client_id.client_id;
newid = zh-&primer_storage.sessionId;
//把客户端的client_id和服务端的sessionid做比较,如果不同说明会话已经过期,此时会触发一个过期事件
if (oldid != 0 && oldid != newid) {
zh-&state = ZOO_EXPIRED_SESSION_STATE;
errno = ESTALE;
return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
&sessionId=%#llx has expired.&,oldid);
//把客户端id设置为服务端的session_id
zh-&recv_timeout = zh-&primer_storage.timeOut;
zh-&client_id.client_id = newid;
memcpy(zh-&client_id.passwd, &zh-&primer_storage.passwd,
sizeof(zh-&client_id.passwd));
//进入ZOO_CONNECTED_STATE状态
zh-&state = ZOO_CONNECTED_STATE;
LOG_INFO((&session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d&,
format_endpoint_info(&zh-&addrs[zh-&connect_index]),
newid, zh-&recv_timeout));
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
//发送AUTH和WATCH信息
send_set_watches(zh);
/* send the authentication packet now */
send_auth_info(zh);
LOG_DEBUG((&Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE&));
zh-&input_buffer = 0; // just in case the watcher calls zookeeper_process() again
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
zh-&input_buffer = 0;
// zookeeper_process was called but there was nothing to read
// from the socket
return ZNOTHING;
return ZOK;
这个sendsetwatches很关键,把自己已有的watch信息全部收集起来发给服务端,服务端会把这些信息和自己的信息做比对
如果有不同会发送给客户端事件,这是为了防止心跳超时后,会话没有超时时,因为不是同一个TCP连接导致信息丢失。
这个函数能保证在会话期间的任何节点变化,都能触发watch函数的调用
do_io-&zookeeper_process-&check_events-&send_set_watches
static int send_set_watches(zhandle_t *zh)
struct oarchive *oa;
struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
struct SetWatches req;
req.relativeZxid = zh-&last_zxid;
req.dataWatches.data = collect_keys(zh-&active_node_watchers, (int*)&req.dataWatches.count);
req.existWatches.data = collect_keys(zh-&active_exist_watchers, (int*)&req.existWatches.count);
req.childWatches.data = collect_keys(zh-&active_child_watchers, (int*)&req.childWatches.count);
// return if there are no pending watches
if (!req.dataWatches.count && !req.existWatches.count &&
!req.childWatches.count) {
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
return ZOK;
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, &header&, &h);
rc = rc & 0 ? rc : serialize_SetWatches(oa, &req&, &req);
/* add this buffer to the head of the send queue */
rc = rc & 0 ? rc : queue_front_buffer_bytes(&zh-&to_send, get_buffer(oa),
get_buffer_len(oa));
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
LOG_DEBUG((&Sending set watches request to %s&,format_current_endpoint_info(zh)));
return (rc & 0)?ZMARSHALLINGERROR:ZOK;
do_io-&zookeeper_process-&check_events-&queue_session_event
// IO thread queues session events to be processed by the completion thread
static int queue_session_event(zhandle_t *zh, int state)
struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, && };
struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
struct oarchive *oa;
completion_list_t *cptr;
if ((oa=create_buffer_oarchive())==NULL) {
LOG_ERROR((&out of memory&));
goto error;
rc = serialize_ReplyHeader(oa, &hdr&, &hdr);
rc = rc&0?rc: serialize_WatcherEvent(oa, &event&, &evt);
close_buffer_oarchive(&oa, 1);
goto error;
cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
cptr-&buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
cptr-&buffer-&curr_offset = get_buffer_len(oa);
if (!cptr-&buffer) {
free(cptr);
close_buffer_oarchive(&oa, 1);
goto error;
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
cptr-&c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, &&);
//将watcher的内容放入completions_to_process链表内,由completion线程进行调用
queue_completion(&zh-&completions_to_process, cptr, 0);
if (process_async(zh-&outstanding_sync)) {
process_completions(zh);
return ZOK;
errno=ENOMEM;
return ZSYSTEMERROR;
checkevents看完了,回到zookeeperprocess
nt zookeeper_process(zhandle_t *zh, int events)
buffer_list_t *bptr;
if (zh==NULL)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
api_prolog(zh);
IF_DEBUG(checkResponseLatency(zh));
rc = check_events(zh, events);
if (rc!=ZOK)
return api_epilog(zh, rc);
IF_DEBUG(isSocketReadable(zh));
//如果to_process链表有内容,也就是接受到服务器的信息,那么解析出来
while (rc &= 0 && (bptr=dequeue_buffer(&zh-&to_process))) {
struct ReplyHeader hdr;
struct iarchive *ia = create_buffer_iarchive(
bptr-&buffer, bptr-&curr_offset);
deserialize_ReplyHeader(ia, &hdr&, &hdr);
if (hdr.zxid & 0) {
zh-&last_zxid = hdr.zxid;
// fprintf(stderr, &Got %#x for %#x\n&, hdr.zxid, hdr.xid);
//如果类型是WATCHER_EVENT_XID,那么创建一个WATCHER_EVENT_XID的watch放入completions_to_process队列让异步completions队列去处理
if (hdr.xid == WATCHER_EVENT_XID) {
struct WatcherEvent evt;
int type = 0;
char *path = NULL;
completion_list_t *c = NULL;
LOG_DEBUG((&Processing WATCHER_EVENT&));
deserialize_WatcherEvent(ia, &event&, &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
c-&buffer = bptr;
c-&c.watcher_result = collectWatchers(zh, type, path);
// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
queue_completion(&zh-&completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
//如果类型是SET_WATCHES_XID,那么不处理
LOG_DEBUG((&Processing SET_WATCHES&));
free_buffer(bptr);
//如果类型是验证,那么直接自己调用auth_completion_func来处理(不从completions线程去处理)
} else if (hdr.xid == AUTH_XID){
LOG_DEBUG((&Processing AUTH_XID&));
/* special handling for the AUTH response as it may come back
* out-of-band */
auth_completion_func(hdr.err,zh);
free_buffer(bptr);
/* authentication completion may change the connection state to
* unrecoverable */
if(is_unrecoverable(zh)){
handle_error(zh, ZAUTHFAILED);
close_buffer_iarchive(&ia);
return api_epilog(zh, ZAUTHFAILED);
int rc = hdr.err;
/* Find the request corresponding to the response */
completion_list_t *cptr = dequeue_completion(&zh-&sent_requests);
/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh-&close_requested == 1 && cptr == NULL) {
LOG_DEBUG((&Completion queue has been cleared by zookeeper_close()&));
close_buffer_iarchive(&ia);
return api_epilog(zh,ZINVALIDSTATE);
assert(cptr);
/* The requests are going to come back in order */
if (cptr-&xid != hdr.xid) {
LOG_DEBUG((&Processing unexpected or out-of-order response!&));
// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
free_buffer(bptr);
// put the completion back on the queue (so it gets properly
// signaled and deallocated) and disconnect from the server
queue_completion(&zh-&sent_requests,cptr,1);
return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
&unexpected server response: expected %#x, but received %#x&,
hdr.xid,cptr-&xid);
activateWatcher(zh, cptr-&watcher, rc);
//如果是异步请求
if (cptr-&c.void_result != SYNCHRONOUS_MARKER) {
//如果是ping消息,那么直接更新last_ping即可
if(hdr.xid == PING_XID){
int elapsed = 0;
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh-&last_ping, &now);
LOG_DEBUG((&Got ping response in %d ms&, elapsed));
// Nothing to do with a ping response
free_buffer(bptr);
destroy_completion_entry(cptr);
LOG_DEBUG((&Queueing asynchronous response&));
//如果其他的,那么发给completion线程去处理
cptr-&buffer = bptr;
queue_completion(&zh-&completions_to_process, cptr, 0);
//同步消息,那么自己来处理
struct sync_completion
*sc = (struct sync_completion*)cptr-&data;
sc-&rc = rc;
process_sync_completion(cptr, sc, ia, zh);
notify_sync_completion(sc);
free_buffer(bptr);
zh-&outstanding_sync--;
destroy_completion_entry(cptr);
close_buffer_iarchive(&ia);
if (process_async(zh-&outstanding_sync)) {
process_completions(zh);
return api_epilog(zh,ZOK);
一共5种状态
const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
对它的状态转换做一个示意图
connect返回-1且errnr == EWOULDBLOCK
notconnected ------------------------------------------ connecting--------------------------|
| prime_connection
different session id
expired_session --------------------------------------
associating ------------------- connected ---------auth_failed
有如下四种情景:
1 使用zookeeperinit刚连上时,会触发SESSIONEVENT且状态为CONNECTED的watcher
2 如果发生了节点变化,会Process相应的WATCHER_EVENT的watcher
3 当client和zookeeper超时时间大于心跳时间且小于会话时间时
原先的连接断线后会触发SESSIONEVENT且状态为CONNECTING的watcher,当重新连上后会触发SESSIONEVENT且状态为CONNECTED的watcher
如果在这段时间内监视的节点发生变化,还会Process相应的WATCHER_EVENT的watcher
4 如果超时时间大于会话时间时
原先的连接断线后会触发SESSIONEVENT且状态为CONNECTING的watch,当重新连上后会触发SESSIONEVENT且状态为EXPIRED_SESSION的watcher
此时需要自己重新使用zookeeperinit连接,并且重新注册自己的watcher,这时重新进入情景1,即为触发SESSIONEVENT且状态为CONNECTED的watcher
应该这样正确使用zookeeper_mt库:
使用zookeeper_init注册一个全局watcher如下:
static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
if(type == ZOO_SESSION_EVENT)
if (state == ZOO_CONNECTED_STATE)
log_info(&connected zookeeper&);
//第一次正常连接和超时重连都会触发该状态,所以要判断是否是超时引起的该状态
//如果是会话超时引起的,需要重新设置观察器
//对之前注册的每个路径,都需要显式的触发一次
//(例如原来是调用的get_children监视,那么现在需要调用get_children获取一次)
//防止因为和zk的会话超时,导致这段时间内的节点变化监视丢失
if(reconnection_flag) {
reconnection_flag = 0;
//重新设置对路径的观察事件
else if(state == ZOO_AUTH_FAILED_STATE)
log_error(&Authentication failure. Shutting down...&);
zookeeper_close(zh);
else if(state == ZOO_EXPIRED_SESSION_STATE)
log_error(&Session expired. Shutting down...&);
//超时会话过期,设置重连标记位
reconnection_flag = 1;
//关闭原来的zookeeper handle,并且用zookeeper_init尝试重连
zookeeper_close(zh);
... = zookeeper_init(...);
//判断事件和路径,分发给不同的调用逻辑
要注意的是,watcher是后台线程,因此对某些和主线程共享的变量,需要添加互斥锁
zookeeper的客户端必须做成事件通知机制,多线程确实是一个比较简单的方案
但是对于开发而言,如果要考虑那么多方方面面,确实会很蛋疼,我认为用本文开始提到的zookeeper c库会更加简单方便
21 Nov 2015}

我要回帖

更多关于 zookeeper 客户端命令 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信