ZooKeeper提供了C和Java两种语言的客户端。zkpython对C客户端的代码进行封装,变为python的一个模块zookeeper,import以后便可以在python中方便地调用C客户端提供的函数。这层封装以后,使用仍然不够方便,因为思想是面向过程的,所以又有人写了一个zkclient.py文件,将代码改造为面向对象的模式。本文首先介绍zkpython的基本使用方法,然后利用zkpython包和zkclient.py文件,利用临时顺序节点功能,实现服务的主备。
handler = zookeeper.init("localhost:2181")
调用这个就认为与ZooKeeper建立了连接是错误的。该函数返回一个zookeeper客户端与服务器通信的句柄。初学者一般仅仅根据返回句柄的情况来判断zookeeper 客户端与zookeeper服务器是否建立连接。如果句柄为空则认为是失败,非空则成功。这种方法不对,zookeeper_init创建与ZooKeeper服务端通信的句柄以及对应于此句柄的会话,而会话的创建是一个异步的过程,仅当会话建立成功,zookeeper_init才返回一个可用句柄。
因此,只有会话session创建成功,客户端才和ZooKeeper建立了可用的连接。正确的初始化方法介绍如下。 #### 方法1: 利用Watcher异步通知 在zookeeper_init中设置watcher,当zookeeper client与server会话建立后,触发watcher,当 watcher 的state = 3 (ZOO_CONNECTED_STATE), type = -1(ZOO_SESSION_EVENT)时,确认 会话成功建立,此时zookeeper client 初始化成功,可进行后续操作。示例代码:
import zookeeper
import threading
# 全局锁
conn_cv = threading.Condition()
def connection_watcher(self, h, type, state, path):
if type == zookeeper.SESSION_EVENT and state == zookeeper.CONNECTED_STATE:
conn_cv.acquire()
conn_cv.notifyAll()
conn_cv.release()
if __name__ == "__main__":
# config
servers = "localhost:2181,localhost:2182,localhost:2183" #多个server用逗号分开,此处为单机伪分布式部署
timeout = 10000 # 设置超时时间,超过这个时间客户端和服务端没有交互的话,就会触发超时事件;如果连接注册了临时节点,临时节点会被删除,该值的单位为zookeeper server的tick time,tick time为2000,则为5s
# 同步和异步编程同时存在,通过锁来进行二者的协调
conn_cv.acquire()
handle = zookeeper.init(servers, connection_watcher, timeout)
conn_cv.wait(timeout/1000)
conn_cv.release()
if zookeeper.state(handle) != zookeeper.CONNECTED_STATE:
print “Unable to connect to zookeeper servers”
else:
print "Connected to zookeeper servers [%s]" % servers
while循环判断句柄的state是否为ZOO_CONNECTED_STATE状态,通过zoo_state(zh)判断状态值是否为ZOO_CONNECTED_STATE。
import zookeeper
import time
import sys
servers = "localhost:2181,localhost:2182,localhost:2183"
timeout = 10000
handle = zookeeper.init(servers, connection_watcher, timeout)
start_time = time.time()
while zookeeper.state(handle) != zookeeper.CONNECTED_STATE:
cur_time = time.time()
if cur_time - start_time > timeout / 1000:
print "Unable to connect to zookeeper servers"
sys.exit(-1)
time.sleep(1)
print "Connected to zookeeper servers [%s]" % servers
zookeeper.create(handler, "/zkpython_test", "mydata", [{"perms":0x1f,"scheme":"world","id":"anyone"}]),0)
第一个参数是连接,第二个参数是znode路径,第三个是znode的数据,第四个是acl,第四个是节点的类型(0是永久,1是永久+序号,2是临时,3是临时加序号)
对于acl,READ权限是1,WRITE权限是2,CREATE权限是4,DELETE权限是8,ADMIN权限是16. 0x1f是16进制,表示16*1+15 = 31。而READ+WRITE+CREATE+DELETE+ADMIN=31。所以0x1f表示所有权限都有。
scheme有几种固定形式,一般用例子中的,其他的参考官方文档。
zookeeper.get_children(handler, "/", None)
zookeeper.get(handler, "/zkpython_test")
zookeeper.set(handler, "/zkpython_test", "mydata_new")
zookeeper.delete(handler, "/zkpython_test")
zookeeper.close(handler)
ZooKeeper的getData()、getChildren()和exists(),都能设置一个Watch。Watch官方定义如下: > a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes.
基本使用方法:首先,定义一个watch方法,之后再调用get方法时,把watch传递进去就可以了。
import zookeeper
def myWatch(handler, type, state, path):
print "handler:" + str(handler) +",type:" + str(type) + ",state:" + str(state) + ",path:" + path
zookeeper.get(handler, path, myWatch) # one-time trigger的性质,加入这句后可以保证重复监听
handler = zookeeper.init("localhost:2181") # 初学者常见的错误的初始化方法,需要按上文修改
data = zookeeper.get(handler, "/zkpython_test", myWatch)
Watch的参数含义 - handler:连接索引值,以0开始 - type:事件类型,用常量表示,取值有[zookeeper.NOTWATCHING_EVENT, zookeeper.SESSION_EVENT, zookeeper.CREATED_EVENT, zookeeper.DELETED_EVENT, zookeeper.CHANGED_EVENT, zookeeper.CHILD_EVENT] - status:客户端的状态,用常量表示,取值有[zookeeper.ASSOCIATING_STATE, zookeeper.AUTH_FAILED_STATE, zookeeper.CONNECTED_STATE, zookeeper.CONNECTING_STATE, zookeeper.EXPIRED_SESSION_STATE] - path:znode路径
首先用zkclient.py文件将zkpython封装为面向对象的使用模式。
# /usr/bin/env python
# -*- coding: utf-8 -*-
# @author: 陈水平
# @modfied from https://github.com/phunt/zk-smoketest/blob/master/zkclient.py
import zookeeper, time, threading
from collections import namedtuple
DEFAULT_TIMEOUT = 30000
VERBOSE = True
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
# Mapping of connection state values to human strings.
STATE_NAME_MAPPING = {
zookeeper.ASSOCIATING_STATE: "associating",
zookeeper.AUTH_FAILED_STATE: "auth-failed",
zookeeper.CONNECTED_STATE: "connected",
zookeeper.CONNECTING_STATE: "connecting",
zookeeper.EXPIRED_SESSION_STATE: "expired",
}
# Mapping of event type to human string.
TYPE_NAME_MAPPING = {
zookeeper.NOTWATCHING_EVENT: "not-watching",
zookeeper.SESSION_EVENT: "session",
zookeeper.CREATED_EVENT: "created",
zookeeper.DELETED_EVENT: "deleted",
zookeeper.CHANGED_EVENT: "changed",
zookeeper.CHILD_EVENT: "child",
}
class ZKClientError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class ClientEvent(namedtuple("ClientEvent", 'type, connection_state, path')):
"""
A client event is returned when a watch deferred fires. It denotes
some event on the zookeeper client that the watch was requested on.
"""
@property
def type_name(self):
return TYPE_NAME_MAPPING[self.type]
@property
def state_name(self):
return STATE_NAME_MAPPING[self.connection_state]
def __repr__(self):
return "<ClientEvent %s at %r state: %s>" % (
self.type_name, self.path, self.state_name)
def watchmethod(func):
def decorated(handle, atype, state, path):
event = ClientEvent(atype, state, path)
return func(event)
return decorated
class ZKClient(object):
def __init__(self, servers, timeout=DEFAULT_TIMEOUT):
self.timeout = timeout
self.connected = False
self.conn_cv = threading.Condition( )
self.handle = -1
self.conn_cv.acquire()
if VERBOSE: print("Connecting to %s" % (servers))
start = time.time()
self.handle = zookeeper.init(servers, self.connection_watcher, timeout)
self.conn_cv.wait(timeout/1000)
self.conn_cv.release()
if not self.connected:
raise ZKClientError("Unable to connect to %s" % (servers))
if VERBOSE:
print("Connected in %d ms, handle is %d"
% (int((time.time() - start) * 1000), self.handle))
# modify connection_watcher
def connection_watcher(self, h, type, state, path):
if type == zookeeper.SESSION_EVENT and state == zookeeper.CONNECTED_STATE:
self.handle = h
self.conn_cv.acquire()
self.connected = True
self.conn_cv.notifyAll()
self.conn_cv.release()
def close(self):
return zookeeper.close(self.handle)
def create(self, path, data="", flags=0, acl=[ZOO_OPEN_ACL_UNSAFE]):
start = time.time()
result = zookeeper.create(self.handle, path, data, acl, flags)
if VERBOSE:
print("Node %s created in %d ms"
% (path, int((time.time() - start) * 1000)))
return result
def delete(self, path, version=-1):
start = time.time()
result = zookeeper.delete(self.handle, path, version)
if VERBOSE:
print("Node %s deleted in %d ms"
% (path, int((time.time() - start) * 1000)))
return result
def get(self, path, watcher=None):
return zookeeper.get(self.handle, path, watcher)
def state(self):
return zookeeper.state(self.handle)
def exists(self, path, watcher=None):
return zookeeper.exists(self.handle, path, watcher)
def set(self, path, data="", version=-1):
return zookeeper.set(self.handle, path, data, version)
def set2(self, path, data="", version=-1):
return zookeeper.set2(self.handle, path, data, version)
def get_children(self, path, watcher=None):
return zookeeper.get_children(self.handle, path, watcher)
def async(self, path = "/"):
return zookeeper.async(self.handle, path)
def acreate(self, path, callback, data="", flags=0, acl=[ZOO_OPEN_ACL_UNSAFE]):
result = zookeeper.acreate(self.handle, path, data, acl, flags, callback)
return result
def adelete(self, path, callback, version=-1):
return zookeeper.adelete(self.handle, path, version, callback)
def aget(self, path, callback, watcher=None):
return zookeeper.aget(self.handle, path, watcher, callback)
def aexists(self, path, callback, watcher=None):
return zookeeper.aexists(self.handle, path, watcher, callback)
def aset(self, path, callback, data="", version=-1):
return zookeeper.aset(self.handle, path, data, version, callback)
watch_count = 0
"""Callable watcher that counts the number of notifications"""
class CountingWatcher(object):
def __init__(self):
self.count = 0
global watch_count
self.id = watch_count
watch_count += 1
def waitForExpected(self, count, maxwait):
"""Wait up to maxwait for the specified count,
return the count whether or not maxwait reached.
Arguments:
- `count`: expected count
- `maxwait`: max milliseconds to wait
"""
waited = 0
while (waited < maxwait):
if self.count >= count:
return self.count
time.sleep(1.0);
waited += 1000
return self.count
def __call__(self, handle, typ, state, path):
self.count += 1
if VERBOSE:
print("handle %d got watch for %s in watcher %d, count %d" %
(handle, path, self.id, self.count))
"""Callable watcher that counts the number of notifications
and verifies that the paths are sequential"""
class SequentialCountingWatcher(CountingWatcher):
def __init__(self, child_path):
CountingWatcher.__init__(self)
self.child_path = child_path
def __call__(self, handle, typ, state, path):
if not self.child_path(self.count) == path:
raise ZKClientError("handle %d invalid path order %s" % (handle, path))
CountingWatcher.__call__(self, handle, typ, state, path)
class Callback(object):
def __init__(self):
self.cv = threading.Condition()
self.callback_flag = False
self.rc = -1
def callback(self, handle, rc, handler):
self.cv.acquire()
self.callback_flag = True
self.handle = handle
self.rc = rc
handler()
self.cv.notify()
self.cv.release()
def waitForSuccess(self):
while not self.callback_flag:
self.cv.wait()
self.cv.release()
if not self.callback_flag == True:
raise ZKClientError("asynchronous operation timed out on handle %d" %
(self.handle))
if not self.rc == zookeeper.OK:
raise ZKClientError(
"asynchronous operation failed on handle %d with rc %d" %
(self.handle, self.rc))
class GetCallback(Callback):
def __init__(self):
Callback.__init__(self)
def __call__(self, handle, rc, value, stat):
def handler():
self.value = value
self.stat = stat
self.callback(handle, rc, handler)
class SetCallback(Callback):
def __init__(self):
Callback.__init__(self)
def __call__(self, handle, rc, stat):
def handler():
self.stat = stat
self.callback(handle, rc, handler)
class ExistsCallback(SetCallback):
pass
class CreateCallback(Callback):
def __init__(self):
Callback.__init__(self)
def __call__(self, handle, rc, path):
def handler():
self.path = path
self.callback(handle, rc, handler)
class DeleteCallback(Callback):
def __init__(self):
Callback.__init__(self)
def __call__(self, handle, rc):
def handler():
pass
self.callback(handle, rc, handler)
当有两个或多个服务运行时,同一时间只有一个服务接受请求工作,其他服务待命;当接受请求的服务异常挂掉时,从剩下待命的服务中选出一个服务接受请求工作。把类的初始化函数master_num设置为非1的情况,可以允许多个主。
#/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @function: realize HA (high availability) by ZooKeeper
# @author: 陈水平
# @date: 2016-01-04
#
import sys, os
from os.path import basename, join
import time
import subprocess
import signal
# import self-defined module
from zkclient import ZKClient, ZKClientError, watchmethod, zookeeper
import spark_manage_util
import logging
logging.basicConfig(
level = logging.DEBUG,
format = "[%(asctime)s] %(levelname)-8s %(message)s"
)
log = logging
class HAZookeeper(object):
def __init__(self, host, worker_path, master_num=1, timeout=10000, verbose=True):
self.VERBOSE = verbose
self.masters = []
self.is_master = None
self.path = None
self.MASTERS_NUM = master_num
self.TIMEOUT = timeout
self.WORKERS_PATH = worker_path
self.ZK_HOST = host
self.login()
self.__init_zk()
self.register()
self.get_master()
def login(self):
"""
connect to ZooKeeper, exit if failed
"""
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
self.say("login ok!")
def __init_zk(self):
"""
create the permanent node if not exist, permanent node represent a distributed lock
"""
nodes = self.WORKERS_PATH[1:].split("/")
for i in range(0, len(nodes)):
node = "/"+"/".join(nodes[:i+1])
if not self.zk.exists(node):
try:
self.zk.create(node, "")
except:
pass
def register(self):
"""
register a ephemeral and sequence node for this worker
"""
self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
self.path = basename(self.path)
self.say("register ok! I'm %s" % self.path)
def get_master(self):
"""
get children, and check who is the smallest child
"""
@watchmethod
def watcher(event):
if event.type_name == "session" and event.state_name != "connected":
self.say("session time out, shut down")
sys.exit(-1)
if event.type_name == "child":
self.say("child changed, try to get master again.")
self.get_master()
children = self.zk.get_children(self.WORKERS_PATH, watcher)
children.sort()
self.say("%s's children: %s" % (self.WORKERS_PATH, children))
# check if I'm master
self.masters = children[:self.MASTERS_NUM]
if self.path in self.masters:
# master->master
self.is_master = True
self.say("I've been master!")
else:
if self.path in children:
self.is_master = False
self.say("%s is masters, I'm slave" % self.masters)
else:
self.say("%s is masters, my path %s not in children %s, reconnect" % (self.masters, self.path, children))
self.path = None
self.login()
self.register()
def say(self, msg):
"""
print messages to screen
"""
if self.VERBOSE:
if self.path:
log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
else:
log.info(msg)
def test():
host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
worker_path = "/app/zk_test"
master_num = 1
timeout = 10000
ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
while True:
time.sleep(1)
if __name__ == "__main__":
test()
def watcher(event):
if event.type_name == "session" and event.state_name != "connected":
self.say("session time out, shut down")
sys.exit(-1)
def test():
host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
worker_path = "/app/zk_test"
master_num = 1
timeout = 10000
ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
while True:
if ha_zookeeper.zk.state() != zookeeper.CONNECTED_STATE:
print "connection with zookeeper server failed. shut down..."
sys.exit(-1)
time.sleep(1)
def test():
host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
worker_path = "/app/zk_test"
master_num = 1
timeout = 10000
ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
while True:
try:
if not ha_zookeeper.zk.exists(ha_zookeeper.WORKERS_PATH + "/" + ha_zookeeper.path)::
print "path was unexpectly deleted, recreate."
ha_zookeeper.path = None
ha_zookeeper.login()
ha_zookeeper.register()
except Exception, e:
print "connection with zookeeper server failed. shut down..."
sys.exit(-1)
time.sleep(1)