Zookeeper详解(十):Pytho

发布时间:2019-09-15 09:54:05编辑:auto阅读(1589)


    Python对Zookeeper的基本操作

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import sys
    from kazoo.client import KazooClient
    
    
    def main():
        try:
    
            nodePath = "/zktest"
            host = "172.16.48.171"
            port = "2181"
            timeout = 100
            zkc = KazooClient(hosts=host + ':' + port, timeout=timeout)
            zkc.start()
    
            # 判断节点是否存在
            if zkc.exists(nodePath+"/test111"):
                print nodePath + "/test111", "存在"
            else:
                # 建立节点,成功后返回新节点路径
                childrenPath = zkc.create(nodePath+"/test111", "test111")
                print "创建节点:", childrenPath, "成功。"
                # 创建临时节点,连接断开则节点自动删除
                zkc.create(nodePath+"/test999", "test999", ephemeral=True)
    
            # 获取节点数据和节点数据,返回2个值,一个是节点数据,一个是节点stat,这是个ZnodeStat对象,它其实是节点属性,一共有12个属性
            dataAndStat = zkc.get(nodePath)
            data = dataAndStat[0]
            print "数据为:", data
            stat = dataAndStat[1]
            print "数据版本号为:", stat.version
            print "数据长度为:", stat.data_length
            print "子节点数量:", stat.numChildren
    
            # 更新节点数据,数据最大为1MB超过则报错,成功后返回 ZnodeStat 对象
            stat = zkc.set(nodePath, value="test222")
            print "数据版本号为:", stat.version
    
            # 删除节点,后面的参数用于控制是否递归删除,默认是False,但是这样就会有一个问题,如果该节点下有子节点则本次删除失败,你需要先删除
            # 它下面的所有子节点才行
            if zkc.exists(nodePath+"/test111"):
                result = zkc.delete(nodePath+"/test111", recursive=False)
                if result:
                    print "删除节点成功。"
    
            print nodePath + " 的子节点为:", zkc.get_children(nodePath)
    
            zkc.close()
            zkc.stop()
        except Exception as err:
            print err.message
    
    
    if __name__ == "__main__":
        try:
            main()
        finally:
            sys.exit()


    Watcher事件操作

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import sys
    import time
    from kazoo.client import KazooClient
    from kazoo.client import ChildrenWatch
    from kazoo.client import DataWatch
    
    """
    Watcher可以通过两种方式设置,一种是在调用ZK客户端方法的时候传递进去,比如 zk.get_children("/node", watch=FUN),但是这种方法是一次性的
    也就是触发一次就没了,如果你还想继续监听一个事件就需要再次注册。
    另外一种方法是通过高级API实现,监控数据或者节点变化,它只需要我们注册一次。一次性事件关注是zookeeper默认的即便在JAVA客户端里也是,这种高级别
    API在JAVA里是zkclient,而在Python里面就是kazoo。高级API其实是对低级API的封装,对用户来讲更加好用。
    """
    
    __metaclass__ = type
    
    
    class zkWatcherTest:
    
        def __init__(self, host, port, timeout=10):
            self._nodename = ''
            self._host = host
            self._port = port
            self._timeout = timeout
            self._zk = KazooClient(hosts=self._host + ':' + self._port, timeout=self._timeout)
            self._zk.start()
            self._lastNodeList = []
    
        def start(self, zkPath):
            self._lastNodeList = self._zk.get_children(zkPath)
            try:
                ChildrenWatch(client=self._zk, path=zkPath, func=self._NodeChange)
    
                DataWatch(client=self._zk, path=zkPath, func=self._DataChange)
                # 这里的死循环就是为了不让程序退出,你可以把时间设置长一点观察,其实即便没有到60秒的睡眠时间,如果
                # 子节点或者节点数量发生变化也会收到通知。这里的wathch底层就是在节点上设置监听器,然后捕捉事件,如果有
                # 事件触发就调用你传递的方法来处理。
                while True:
                    time.sleep(60)
                    print "OK"
            except Exception as err:
                print err.message
    
        def _NodeChange(self, children):
            """
            处理子节点变化
            :param children: 这个参数并不需要你传递进来,因为把这个方法传递进ChiledrenWatcher,会返回一个当前子节点列表
            :return:
            """
            # print children
            # 如果新节点列表长度大于上次获取的节点列表长度,说明有增加
            if len(children) > len(self._lastNodeList):
                for node in children:
                    if node not in self._lastNodeList:
                        print "新增加的节点为:", str(node)
                        self._lastNodeList = children
            else:
                for node in self._lastNodeList:
                    if node not in children:
                        print "删除的节点为:", str(node)
                        self._lastNodeList = children
    
        def _DataChange(self, data, stat):
            """
            处理节点的数据变化
            :param data:
            :param stat:
            :return:
            """
            print "数据发生变化"
            print "数据为:", data
            print "数据长度:", stat.dataLength
            print "数据版本号version:", stat.version
            print "cversion:", stat.cversion
            print "子节点数量:", stat.numChildren
    
    
    def main():
        try:
            zkwt =zkWatcherTest(host="172.16.48.171", port="2181")
            zkwt.start("/zktest")
        except Exception as err:
            print err.message
    
    
    if __name__ == "__main__":
        try:
            main()
        finally:
            sys.exit()

    关于Watcher,网上很多帖子都是通过装饰器的方式实现的,其实我上面的方式和装饰器是一样的,只是形式不同罢了。功能都能实现,只是用装饰器有时候会不方便。

关键字