简介

ZooKeeper提供了C和Java两种语言的客户端。zkpython对C客户端的代码进行封装,变为python的一个模块zookeeper,import以后便可以在python中方便地调用C客户端提供的函数。这层封装以后,使用仍然不够方便,因为思想是面向过程的,所以又有人写了一个zkclient.py文件,将代码改造为面向对象的模式。本文首先介绍zkpython的基本使用方法,然后利用zkpython包和zkclient.py文件,利用临时顺序节点功能,实现服务的主备。

zkpython的使用

建立连接

handler = zookeeper.init("localhost:2181")

调用这个就认为与ZooKeeper建立了连接是错误的。该函数返回一个zookeeper客户端与服务器通信的句柄。初学者一般仅仅根据返回句柄的情况来判断zookeeper 客户端与zookeeper服务器是否建立连接。如果句柄为空则认为是失败,非空则成功。这种方法不对,zookeeper_init创建与ZooKeeper服务端通信的句柄以及对应于此句柄的会话,而会话的创建是一个异步的过程,仅当会话建立成功,zookeeper_init才返回一个可用句柄。

因此,只有会话session创建成功,客户端才和ZooKeeper建立了可用的连接。正确的初始化方法介绍如下。 #### 方法1: 利用Watcher异步通知 在zookeeper_init中设置watcher,当zookeeper client与server会话建立后,触发watcher,当 watcher 的state = 3 (ZOO_CONNECTED_STATE), type = -1(ZOO_SESSION_EVENT)时,确认 会话成功建立,此时zookeeper client 初始化成功,可进行后续操作。示例代码:

import zookeeper
import threading

# 全局锁
conn_cv = threading.Condition()

def connection_watcher(self, h, type, state, path):
    if type == zookeeper.SESSION_EVENT and state == zookeeper.CONNECTED_STATE:
        conn_cv.acquire()
        conn_cv.notifyAll()
        conn_cv.release()
        
if __name__ == "__main__":
    # config
    servers = "localhost:2181,localhost:2182,localhost:2183" #多个server用逗号分开,此处为单机伪分布式部署
    timeout = 10000 # 设置超时时间,超过这个时间客户端和服务端没有交互的话,就会触发超时事件;如果连接注册了临时节点,临时节点会被删除,该值的单位为zookeeper server的tick time,tick time为2000,则为5s

    # 同步和异步编程同时存在,通过锁来进行二者的协调
    conn_cv.acquire()
    handle = zookeeper.init(servers, connection_watcher, timeout)
    conn_cv.wait(timeout/1000)
    conn_cv.release()
    if zookeeper.state(handle) != zookeeper.CONNECTED_STATE:
        print “Unable to connect to zookeeper servers”
    else:
        print "Connected to zookeeper servers [%s]" % servers

方法二:主动轮询

while循环判断句柄的state是否为ZOO_CONNECTED_STATE状态,通过zoo_state(zh)判断状态值是否为ZOO_CONNECTED_STATE。

import zookeeper
import time
import sys
servers = "localhost:2181,localhost:2182,localhost:2183"
timeout = 10000

handle = zookeeper.init(servers, connection_watcher, timeout)
start_time = time.time()
while zookeeper.state(handle) != zookeeper.CONNECTED_STATE:
    cur_time = time.time()
    if cur_time - start_time > timeout / 1000:
        print "Unable to connect to zookeeper servers"
        sys.exit(-1)
    time.sleep(1)
print "Connected to zookeeper servers [%s]" % servers

创建节点

zookeeper.create(handler, "/zkpython_test", "mydata", [{"perms":0x1f,"scheme":"world","id":"anyone"}]),0)

第一个参数是连接,第二个参数是znode路径,第三个是znode的数据,第四个是acl,第四个是节点的类型(0是永久,1是永久+序号,2是临时,3是临时加序号)

对于acl,READ权限是1,WRITE权限是2,CREATE权限是4,DELETE权限是8,ADMIN权限是16. 0x1f是16进制,表示16*1+15 = 31。而READ+WRITE+CREATE+DELETE+ADMIN=31。所以0x1f表示所有权限都有。

scheme有几种固定形式,一般用例子中的,其他的参考官方文档。

查看子节点

zookeeper.get_children(handler, "/", None)

获取节点的数据

zookeeper.get(handler, "/zkpython_test")

修改节点的数据

zookeeper.set(handler, "/zkpython_test", "mydata_new")

删除节点

zookeeper.delete(handler, "/zkpython_test")

关闭连接

zookeeper.close(handler)

使用Watch

ZooKeeper的getData()、getChildren()和exists(),都能设置一个Watch。Watch官方定义如下: > a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes.

基本使用方法:首先,定义一个watch方法,之后再调用get方法时,把watch传递进去就可以了。

import zookeeper

def myWatch(handler, type, state, path):
    print "handler:" + str(handler) +",type:" + str(type) + ",state:" + str(state) + ",path:" + path
    zookeeper.get(handler, path, myWatch) # one-time trigger的性质,加入这句后可以保证重复监听

handler = zookeeper.init("localhost:2181") # 初学者常见的错误的初始化方法,需要按上文修改
data = zookeeper.get(handler, "/zkpython_test", myWatch)

Watch的参数含义 - handler:连接索引值,以0开始 - type:事件类型,用常量表示,取值有[zookeeper.NOTWATCHING_EVENT, zookeeper.SESSION_EVENT, zookeeper.CREATED_EVENT, zookeeper.DELETED_EVENT, zookeeper.CHANGED_EVENT, zookeeper.CHILD_EVENT] - status:客户端的状态,用常量表示,取值有[zookeeper.ASSOCIATING_STATE, zookeeper.AUTH_FAILED_STATE, zookeeper.CONNECTED_STATE, zookeeper.CONNECTING_STATE, zookeeper.EXPIRED_SESSION_STATE] - path:znode路径

zkpython HA的实现

封装对象ZKClient

首先用zkclient.py文件将zkpython封装为面向对象的使用模式。

# /usr/bin/env python
# -*- coding: utf-8 -*-
# @author: 陈水平
# @modfied from https://github.com/phunt/zk-smoketest/blob/master/zkclient.py

import zookeeper, time, threading
from collections import namedtuple


DEFAULT_TIMEOUT = 30000
VERBOSE = True

ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}

# Mapping of connection state values to human strings.
STATE_NAME_MAPPING = {
    zookeeper.ASSOCIATING_STATE: "associating",
    zookeeper.AUTH_FAILED_STATE: "auth-failed",
    zookeeper.CONNECTED_STATE: "connected",
    zookeeper.CONNECTING_STATE: "connecting",
    zookeeper.EXPIRED_SESSION_STATE: "expired",
}

# Mapping of event type to human string.
TYPE_NAME_MAPPING = {
    zookeeper.NOTWATCHING_EVENT: "not-watching",
    zookeeper.SESSION_EVENT: "session",
    zookeeper.CREATED_EVENT: "created",
    zookeeper.DELETED_EVENT: "deleted",
    zookeeper.CHANGED_EVENT: "changed",
    zookeeper.CHILD_EVENT: "child", 
}

class ZKClientError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)

class ClientEvent(namedtuple("ClientEvent", 'type, connection_state, path')):
    """
    A client event is returned when a watch deferred fires. It denotes
    some event on the zookeeper client that the watch was requested on.
    """

    @property
    def type_name(self):
        return TYPE_NAME_MAPPING[self.type]

    @property
    def state_name(self):
        return STATE_NAME_MAPPING[self.connection_state]

    def __repr__(self):
        return  "<ClientEvent %s at %r state: %s>" % (
            self.type_name, self.path, self.state_name)


def watchmethod(func):
    def decorated(handle, atype, state, path):
        event = ClientEvent(atype, state, path)
        return func(event)
    return decorated

class ZKClient(object):
    def __init__(self, servers, timeout=DEFAULT_TIMEOUT):
        self.timeout = timeout
        self.connected = False
        self.conn_cv = threading.Condition( )
        self.handle = -1
        
        self.conn_cv.acquire()
        if VERBOSE: print("Connecting to %s" % (servers))
        start = time.time()
        self.handle = zookeeper.init(servers, self.connection_watcher, timeout)
        self.conn_cv.wait(timeout/1000)
        self.conn_cv.release()

        if not self.connected:
            raise ZKClientError("Unable to connect to %s" % (servers))

        if VERBOSE:
            print("Connected in %d ms, handle is %d"
                  % (int((time.time() - start) * 1000), self.handle))

    # modify connection_watcher
    def connection_watcher(self, h, type, state, path):
        if type == zookeeper.SESSION_EVENT and state == zookeeper.CONNECTED_STATE:
            self.handle = h
            self.conn_cv.acquire()
            self.connected = True
            self.conn_cv.notifyAll()
            self.conn_cv.release()

    def close(self):
        return zookeeper.close(self.handle)
    
    def create(self, path, data="", flags=0, acl=[ZOO_OPEN_ACL_UNSAFE]):
        start = time.time()
        result = zookeeper.create(self.handle, path, data, acl, flags)
        if VERBOSE:
            print("Node %s created in %d ms"
                  % (path, int((time.time() - start) * 1000)))
        return result

    def delete(self, path, version=-1):
        start = time.time()
        result = zookeeper.delete(self.handle, path, version)
        if VERBOSE:
            print("Node %s deleted in %d ms"
                  % (path, int((time.time() - start) * 1000)))
        return result

    def get(self, path, watcher=None):
        return zookeeper.get(self.handle, path, watcher)

    def state(self):
        return zookeeper.state(self.handle)

    def exists(self, path, watcher=None):
        return zookeeper.exists(self.handle, path, watcher)

    def set(self, path, data="", version=-1):
        return zookeeper.set(self.handle, path, data, version)

    def set2(self, path, data="", version=-1):
        return zookeeper.set2(self.handle, path, data, version)

    def get_children(self, path, watcher=None):
        return zookeeper.get_children(self.handle, path, watcher)

    def async(self, path = "/"):
        return zookeeper.async(self.handle, path)

    def acreate(self, path, callback, data="", flags=0, acl=[ZOO_OPEN_ACL_UNSAFE]):
        result = zookeeper.acreate(self.handle, path, data, acl, flags, callback)
        return result

    def adelete(self, path, callback, version=-1):
        return zookeeper.adelete(self.handle, path, version, callback)

    def aget(self, path, callback, watcher=None):
        return zookeeper.aget(self.handle, path, watcher, callback)

    def aexists(self, path, callback, watcher=None):
        return zookeeper.aexists(self.handle, path, watcher, callback)

    def aset(self, path, callback, data="", version=-1):
        return zookeeper.aset(self.handle, path, data, version, callback)

watch_count = 0

"""Callable watcher that counts the number of notifications"""
class CountingWatcher(object):
    def __init__(self):
        self.count = 0
        global watch_count
        self.id = watch_count
        watch_count += 1

    def waitForExpected(self, count, maxwait):
        """Wait up to maxwait for the specified count,
        return the count whether or not maxwait reached.

        Arguments:
        - `count`: expected count
        - `maxwait`: max milliseconds to wait
        """
        waited = 0
        while (waited < maxwait):
            if self.count >= count:
                return self.count
            time.sleep(1.0);
            waited += 1000
        return self.count

    def __call__(self, handle, typ, state, path):
        self.count += 1
        if VERBOSE:
            print("handle %d got watch for %s in watcher %d, count %d" %
                  (handle, path, self.id, self.count))

"""Callable watcher that counts the number of notifications
and verifies that the paths are sequential"""
class SequentialCountingWatcher(CountingWatcher):
    def __init__(self, child_path):
        CountingWatcher.__init__(self)
        self.child_path = child_path

    def __call__(self, handle, typ, state, path):
        if not self.child_path(self.count) == path:
            raise ZKClientError("handle %d invalid path order %s" % (handle, path))
        CountingWatcher.__call__(self, handle, typ, state, path)

class Callback(object):
    def __init__(self):
        self.cv = threading.Condition()
        self.callback_flag = False
        self.rc = -1

    def callback(self, handle, rc, handler):
        self.cv.acquire()
        self.callback_flag = True
        self.handle = handle
        self.rc = rc
        handler()
        self.cv.notify()
        self.cv.release()

    def waitForSuccess(self):
        while not self.callback_flag:
            self.cv.wait()
        self.cv.release()

        if not self.callback_flag == True:
            raise ZKClientError("asynchronous operation timed out on handle %d" %
                             (self.handle))
        if not self.rc == zookeeper.OK:
            raise ZKClientError(
                "asynchronous operation failed on handle %d with rc %d" %
                (self.handle, self.rc))


class GetCallback(Callback):
    def __init__(self):
        Callback.__init__(self)

    def __call__(self, handle, rc, value, stat):
        def handler():
            self.value = value
            self.stat = stat
        self.callback(handle, rc, handler)

class SetCallback(Callback):
    def __init__(self):
        Callback.__init__(self)

    def __call__(self, handle, rc, stat):
        def handler():
            self.stat = stat
        self.callback(handle, rc, handler)

class ExistsCallback(SetCallback):
    pass

class CreateCallback(Callback):
    def __init__(self):
        Callback.__init__(self)

    def __call__(self, handle, rc, path):
        def handler():
            self.path = path
        self.callback(handle, rc, handler)

class DeleteCallback(Callback):
    def __init__(self):
        Callback.__init__(self)

    def __call__(self, handle, rc):
        def handler():
            pass
        self.callback(handle, rc, handler)

选主实现

当有两个或多个服务运行时,同一时间只有一个服务接受请求工作,其他服务待命;当接受请求的服务异常挂掉时,从剩下待命的服务中选出一个服务接受请求工作。把类的初始化函数master_num设置为非1的情况,可以允许多个主。

#/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @function: realize HA (high availability) by ZooKeeper
# @author: 陈水平
# @date: 2016-01-04
#

import sys, os
from os.path import basename, join
import time
import subprocess
import signal

# import self-defined module
from zkclient import ZKClient, ZKClientError, watchmethod, zookeeper
import spark_manage_util


import logging
logging.basicConfig(
    level = logging.DEBUG,
    format = "[%(asctime)s] %(levelname)-8s %(message)s"
)
 
log = logging

class HAZookeeper(object):
    def __init__(self, host, worker_path, master_num=1, timeout=10000, verbose=True):
        self.VERBOSE = verbose
        self.masters = []
        self.is_master = None
        self.path = None
        self.MASTERS_NUM = master_num
        self.TIMEOUT = timeout
        self.WORKERS_PATH = worker_path
        self.ZK_HOST = host

        self.login()
        self.__init_zk()
        self.register()
        self.get_master()

    def login(self):
        """
        connect to ZooKeeper, exit if failed
        """
        self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
        self.say("login ok!")
    
    def __init_zk(self):
        """
        create the permanent node if not exist, permanent node represent a distributed lock
        """
        nodes = self.WORKERS_PATH[1:].split("/")
        for i in range(0, len(nodes)):
            node = "/"+"/".join(nodes[:i+1])
            if not self.zk.exists(node):
                try:
                    self.zk.create(node, "")
                except:
                    pass

    def register(self):
        """
        register a ephemeral and sequence node for this worker
        """
        self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
        self.path = basename(self.path)
        self.say("register ok! I'm %s" % self.path)

    def get_master(self):
        """
        get children, and check who is the smallest child
        """
        @watchmethod
        def watcher(event):
            if event.type_name == "session" and event.state_name != "connected":
                self.say("session time out, shut down")
                sys.exit(-1)
            if event.type_name == "child":
                self.say("child changed, try to get master again.")
                self.get_master()

        children = self.zk.get_children(self.WORKERS_PATH, watcher)
        children.sort()
        self.say("%s's children: %s" % (self.WORKERS_PATH, children))

        # check if I'm master
        self.masters = children[:self.MASTERS_NUM]
        
        if self.path in self.masters:
            # master->master
            self.is_master = True
            self.say("I've been master!")
        else:
            if self.path in children:
               self.is_master = False
               self.say("%s is masters, I'm slave" % self.masters)
            else:
                self.say("%s is masters, my path %s not in children %s, reconnect" % (self.masters, self.path, children))
                self.path = None
                self.login()
                self.register()

    def say(self, msg):
        """
        print messages to screen
        """
        if self.VERBOSE:
            if self.path:
                log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
            else:
                log.info(msg) 


def test():
    host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
    worker_path = "/app/zk_test"
    master_num = 1
    timeout = 10000
    ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
    while True:
        time.sleep(1)

if __name__ == "__main__":
       test()

ZooKeeper连接中断的解决方法

方法1:在Watcher里处理session失效事件

def watcher(event):
    if event.type_name == "session" and event.state_name != "connected":
         self.say("session time out, shut down")
         sys.exit(-1)

方法2:连接句柄的状态

def test():
    host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
    worker_path = "/app/zk_test"
    master_num = 1
    timeout = 10000
    ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
    while True:
        if ha_zookeeper.zk.state() != zookeeper.CONNECTED_STATE:
            print "connection with zookeeper server failed. shut down..."
            sys.exit(-1)
        time.sleep(1)

方法3:通过调用句柄的exists等方法

def test():
    host = "XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063,XXX.XXX.XXX.XXX:5063"
    worker_path = "/app/zk_test"
    master_num = 1
    timeout = 10000
    ha_zookeeper = HAZookeeper(host, worker_path, master_num, timeout)
    while True:
        try:
            if not ha_zookeeper.zk.exists(ha_zookeeper.WORKERS_PATH + "/" + ha_zookeeper.path)::
                print "path was unexpectly deleted, recreate."
                ha_zookeeper.path = None
                ha_zookeeper.login()
                ha_zookeeper.register()
        except Exception, e:    
            print "connection with zookeeper server failed. shut down..."
            sys.exit(-1)
        time.sleep(1)