使用python3调用 salt-api

发布时间:2019-09-27 07:09:51编辑:auto阅读(1927)

    使用python3调用 salt-api

    现在所有的操作还是基于命令行模式,在项目中我们不能使用命令行的模式去调用salt-api,所以我们可以写一个基于salt-api的类,方便项目代码的调用。在这里特别附上两种方式实现的python3版本的salt-api class。

    方式一

    #python3.x
    import pycurl
    from io import BytesIO
    import json
    
    class PyCurl(object):
        def __init__(self, url, **kwargs):
            # 传入url地址
            self.url = url
            # 取出header相关信息
            self.header = kwargs.get("header", None)
            # 创建一个curl对象
            self.curl = pycurl.Curl()
            # setopt 来设置一些请求选项
            # 指定请求的URL
            self.curl.setopt(self.curl.URL, self.url)
            # 设置代理浏览器
            self.curl.setopt(self.curl.HEADER, False)
            # 设置请求方式
            self.curl.setopt(self.curl.POST, True)
            # 设置https方式
            self.curl.setopt(pycurl.SSL_VERIFYPEER, 0)
            self.curl.setopt(pycurl.SSL_VERIFYHOST, 0)
            # 判断header是否存在
            if self.header:
                # 设置模拟浏览器
                self.curl.setopt(self.curl.HTTPHEADER, self.header)
    
        def request(self, data=None, timeout=None):
            # 判断对象类型 是否为 str
            if isinstance(data, str):
                #将数据提交
                self.curl.setopt(pycurl.POSTFIELDS, data)
            header_buf = BytesIO()
            body_buf = BytesIO()
            # 强制获取新的连接,即替代缓存中的连接
            self.curl.setopt(self.curl.FRESH_CONNECT, True)
            # 完成交互后强制断开连接,不重用
            self.curl.setopt(self.curl.FORBID_REUSE, True)
            if str(timeout).isdigit() and timeout > 0:
                # 设置timeout超时时间
                self.curl.setopt(self.curl.TIMEOUT, timeout)
            # 将返回的HTTP HEADER定向到回调函数header_buf
            self.curl.setopt(self.curl.HEADERFUNCTION, header_buf.write)
            # 将返回的内容定向到回调函数body_buf
            self.curl.setopt(self.curl.WRITEFUNCTION, body_buf.write)
            try:
                # 服务器返回信息
                self.curl.perform()
            except pycurl.error:
                return False
            # 状态码
            http_code = self.curl.getinfo(self.curl.HTTP_CODE)
            # 关闭连接
            self.curl.close()
            # 返回状态码 header body
            return {"http_code": http_code, "header": header_buf.getvalue(), "body": body_buf.getvalue(), "url": self.url}
    
    class SaltApi(object):
    
        def __init__(self,**kwargs):
    
            # 设置超时时间
            self.timeout = kwargs.get("timeout", 300)
            # 设置头信息
            self.header = kwargs.get("header", ["Content-Type:application/json"])
            # 获取url
            self.__url = "https://192.168.104.76:8000"
    
            # 获取
            self.__username = "salt-api"
            self.__password = "salt-api"
    
        # token id 获取
        def token_id(self):
            obj = {'eauth': 'pam', 'username': self.__username, 'password': self.__password}
            result = self.post(prefix="/login",**obj)
            if result:
                try:
                    self.__token_id = result['return'][0]['token']
                except KeyError:
                    raise KeyError
            print(self.__token_id)
            return self.__token_id
    
        def post(self, prefix="/",token=None,**data):
    
            # url拼接
            url = self.__url + prefix
            print (data)
            # 实例化
            self.header.append(str(token))
            curl = PyCurl(url, header=self.header)
            # 发起请求
            result = curl.request(data=json.dumps(data), timeout=self.timeout)
            # 判断值
            if not result:
                return result
            #判断状态码是否等于200
            if result["http_code"] != 200:
                self.response = "response code %s".format(result["info"]["http_code"])
                return self.response
            result = json.loads(result["body"].decode())
            # 判断是否有error
            if "error" in result and result["error"]:
                self.response = "%s(%s)" % (result["error"]["data"], result["error"]["code"])
                return self.response
            #返回正确的数据
            return result
    
        def all_key(self):
            '''
            获取所有的minion_key
            '''
            token = 'X-Auth-Token:%s'%self.token_id()
            obj = {'client': 'wheel', 'fun': 'key.list_all'}
            content = self.post(token=token,**obj)
            # 取出认证已经通过的
            minions = content['return'][0]['data']['return']['minions']
            #print('已认证',minions)
            # 取出未通过认证的
            minions_pre = content['return'][0]['data']['return']['minions_pre']
            # print('未认证',minions_pre)
            return minions,minions_pre
    
        def accept_key(self,node_name):
            '''
            如果你想认证某个主机 那么调用此方法
            '''
            token = 'X-Auth-Token:%s' % self.token_id()
            obj = {'client': 'wheel', 'fun': 'key.accept','match':node_name}
            content = self.post(token=token,**obj)
            print (content)
            ret = content['return'][0]['data']['success']
            return ret
    
        # 删除认证方法
        def delete_key(self, node_name):
            obj = {'client': 'wheel', 'fun': 'key.delete', 'match': node_name}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
    
            ret = content['return'][0]['data']['success']
            return ret
    
        # 针对主机远程执行模块
        def host_remote_func(self, tgt, fun):
            ''' tgt是主机 fun是模块
                写上模块名 返回 可以用来调用基本的资产
                例如 curl -k https://ip地址:8080/ \
            >      -H "Accept: application/x-yaml" \
            >      -H "X-Auth-Token:b50e90485615309de0d83132cece2906f6193e43" \
            >      -d client='local' \
            >      -d tgt='*' \
            >      -d fun='test.ping'  要执行的模块
            return:
            - iZ28r91y66hZ: true
              node2.minion: true
            '''
            obj = {'client': 'local', 'tgt': tgt, 'fun': fun}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            ret = content['return'][0]
            return ret
    
        def group_remote_func(self,tgt,fun):
            obj = {'client': 'local', 'tgt': tgt, 'fun': fun,'expr_form': 'nodegroup'}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            print (content)
            ret = content['return'][0]
            return ret
    
        def host_remote_execution_module(self,tgt,fun,arg):
            '执行fun 传入传入参数arg '
            obj = {'client': 'local', 'tgt': tgt, 'fun': fun,'arg': arg}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            ret = content['return'][0]
            return ret
            #print(salt_aa.host_remote_execution_module('*', 'cmd.run', 'ifconfig'))
    
        # 基于分组来执行
        def group_remote_execution_module(self, tgt, fun, arg):
            '''
            根据分组来执行
            tgt =
            '''
    
            obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'nodegroup'}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]
            return jid
    
        def host_sls(self, tgt, arg):
            '''主机进行sls'''
            obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            return content
    
        def group_sls(self, tgt, arg):
            ''' 分组进行sls '''
            obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]['jid']
            return jid
    
        def host_sls_async(self, tgt, arg):
            '''主机异步sls '''
            obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]['jid']
            return jid
    
        def group_sls_async(self, tgt, arg):
            '''分组异步sls '''
            obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]['jid']
            return jid
    
        def server_group_pillar(self, tgt, arg, **kwargs):
            '''分组进行sls and pillar'''
            obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup',
                   'kwarg': kwargs}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]
            print (jid)
    
        def server_hosts_pillar(self, tgt, arg,**kwargs):
            '''针对主机执行sls and pillar '''
            obj = {"client": "local", "tgt": tgt, "fun": "state.sls", "arg": arg,"kwarg":kwargs}
            token = 'X-Auth-Token:%s' % self.token_id()
            content = self.post(token=token, **obj)
            jid = content['return'][0]
            return jid
    
        def jobs_all_list(self):
            '''打印所有jid缓存'''
            token = 'X-Auth-Token:%s' % self.token_id()
            obj = {"client": "runner", "fun": "jobs.list_jobs"}
            content = self.post(token=token, **obj)
            print (content)
    
        def jobs_jid_status(self, jid):
            '''查看jid运行状态'''
            token = 'X-Auth-Token:%s' % self.token_id()
            obj = {"client": "runner", "fun": "jobs.lookup_jid", "jid": jid}
            content = self.post(token=token, **obj)
            print (content)
            return content
    if __name__ == '__main__':
    
        sa = saltapi.SaltApi()
        print (sa.host_remote_execution_module('node76','cmd.run','ifconfig'))
        print (sa.accept_key("node76"))
    

    方式二

    #python3x
    import urllib,json
    import urllib.request
    import urllib.parse
    import ssl
    
    ssl._create_default_https_context = ssl._create_unverified_context
    
    class SaltAPI(object):
        __token_id = ''
    
        def __init__(self,url,user,password):
            self.__url = url
            self.__user = user
            self.__password = password
    
        def token_id(self):
            """
                用户登陆和获取token
            :return:
            """
            params = {'eauth': 'pam', 'username': self.__user, 'password': self.__password}
            encode = urllib.parse.urlencode(params)
            obj = urllib.parse.unquote(encode).encode('utf-8')
            content = self.postRequest(obj, prefix='/login')
            try:
                self.__token_id = content['return'][0]['token']
            except KeyError:
                raise KeyError
    
        def postRequest(self,obj,prefix='/'):
            url = self.__url + prefix
            headers = {'X-Auth-Token': self.__token_id}
            req = urllib.request.Request(url, obj, headers)
            opener = urllib.request.urlopen(req)
            content = json.loads(opener.read().decode('utf-8'))
            return content
    
        def list_all_key(self):
            """
                获取包括认证、未认证salt主机
            """
    
            params = {'client': 'wheel', 'fun': 'key.list_all'}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            minions = content['return'][0]['data']['return']['minions']
            minions_pre = content['return'][0]['data']['return']['minions_pre']
            return minions, minions_pre
    
        def delete_key(self, node_name):
            '''
                拒绝salt主机
            '''
    
            params = {'client': 'wheel', 'fun': 'key.delete', 'match': node_name}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]['data']['success']
            return ret
    
        def accept_key(self,node_name):
            '''
                接受salt主机
            '''
    
            params = {'client': 'wheel', 'fun': 'key.accept', 'match': node_name}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]['data']['success']
            return ret
    
        def salt_get_jid_ret(self,jid):
            """
                通过jid获取执行结果
            :param jid: jobid
            :return: 结果
            """
            params = {'client':'runner', 'fun':'jobs.lookup_jid', 'jid': jid}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]
            return ret
    
        def salt_running_jobs(self):
            """
                获取运行中的任务
            :return: 任务结果
            """
            params = {'client':'runner', 'fun': 'jobs.active'}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]
            return ret
    
        def remote_noarg_execution_sigle(self, tgt, fun):
            """
                单台minin执行命令没有参数
            :param tgt: 目标主机
            :param fun:  执行模块
            :return: 执行结果
            """
            params = {'client': 'local', 'tgt': tgt, 'fun': fun}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            # print(content)
            # {'return': [{'salt-master': True}]}
            ret = content['return'][0]
            return ret
    
        def remote_execution_single(self, tgt, fun, arg):
            """
                单台minion远程执行,有参数
            :param tgt: minion
            :param fun: 模块
            :param arg: 参数
            :return: 执行结果
            """
            params = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            # print(content)
            # {'return': [{'salt-master': 'root'}]}
            ret = content['return']
            return ret
    
        def remote_async_execution_module(self, tgt, fun, arg):
            """
                远程异步执行模块,有参数
            :param tgt: minion list
            :param fun: 模块
            :param arg: 参数
            :return: jobid
            """
            params = {'client': 'local_async', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'list'}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            # print(content)
            # {'return': [{'jid': '20180131173846594347', 'minions': ['salt-master', 'salt-minion']}]}
            jid = content['return'][0]['jid']
            return jid
    
        def remote_execution_module(self, tgt, fun, arg):
            """
                远程执行模块,有参数
            :param tgt: minion list
            :param fun: 模块
            :param arg: 参数
            :return: dict, {'minion1': 'ret', 'minion2': 'ret'}
            """
            params = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'list'}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            # print(content)
            # {'return': [{'salt-master': 'root', 'salt-minion': 'root'}]}
            ret = content['return'][0]
            return ret
    
        def salt_state(self, tgt, arg, expr_form):
            '''
            sls文件
            '''
            params = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': expr_form}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]
            return ret
    
        def salt_alive(self, tgt):
            '''
            salt主机存活检测
            '''
            params = {'client': 'local', 'tgt': tgt, 'fun': 'test.ping'}
            obj = urllib.parse.urlencode(params).encode('utf-8')
            self.token_id()
            content = self.postRequest(obj)
            ret = content['return'][0]
            return ret
    
    if __name__ == '__main__':
            salt = SaltAPI(url="https://192.168.104.76:8000",user="salt-api",password="salt-api")
            minions, minions_pre = salt.list_all_key()
            # 说明如果'expr_form': 'list',表示minion是以主机列表形式执行时,需要把list拼接成字符串,如下所示
            minions = ['node76', 'node76']
            hosts = map(str, minions)
            hosts = ",".join(hosts)
            ret = salt.remote_noarg_execution_sigle('node76', 'test.ping')
            print(ret)
            ret = salt.remote_noarg_execution_sigle('node76', 'test.ping')
            print(ret)
            # print(type(ret))

关键字