ansible (2.4.2.0) A

发布时间:2019-09-15 09:59:30编辑:auto阅读(1750)

    转自 jumpserver 5.0 测试版本 下载地址

    https://github.com/hequan2017/zabbix-models/tree/master/ansible_run

    只是简单改了一下 能够单独使用。

    目录结构

    ansible_run/
    ├── callback.py
    ├── exceptions.py
    ├── __init__.py
    ├── inventory.py
    ├── runner.py
    ├── test_inventory.py
    └── test_runner.py

    下面两个是演示文件

    先 pip3 install ansible==2.4.2.0 安装

    例子

    # -*- coding: utf-8 -*-
    
    from  runner import AdHocRunner, CommandRunner
    from  inventory import BaseInventory
    
    def  TestAdHocRunner():
            """
             以yml的形式 执行多个命令
            :return:
            """
    
            host_data = [
                {
                    "hostname": "testserver",
                    "ip": "192.168.10.93",
                    "port": 22,
                    "username": "root",
                    "password": "password",
                },
            ]
            inventory = BaseInventory(host_data)
            runner = AdHocRunner(inventory)
    
            tasks = [
                {"action": {"module": "cron","args": "name=\"sync time\" minute=\"*/3\" job=\"/usr/sbin/ntpdate time.nist.gov &> /dev/null\"" }, "name": "run_cmd"},
                {"action": {"module": "shell", "args": "whoami"}, "name": "run_whoami"},
            ]
            ret = runner.run(tasks, "all")
            print(ret.results_summary)
            print(ret.results_raw)
    
    def TestCommandRunner():
            """
            执行单个命令,返回结果
            :return:
            """
    
            host_data = [
                {
                    "hostname": "testserver",
                    "ip": "192.168.10.93",
                    "port": 22,
                    "username": "root",
                    "password": "password",
                },
            ]
            inventory = BaseInventory(host_data)
            runner = CommandRunner(inventory)
    
            res = runner.execute('pwd', 'all')
            print(res.results_command)
            print(res.results_raw)
            print(res.results_command['testserver']['stdout'])
    
    if __name__ == "__main__":
        TestAdHocRunner()
        TestCommandRunner()
    from inventory import BaseInventory
    
    def  Test():
            """
            返回主机信息,组信息,组内主机信息
            :return:
            """
            host_list = [{
                "hostname": "testserver1",
                "ip": "102.1.1.1",
                "port": 22,
                "username": "root",
                "password": "password",
                "private_key": "/tmp/private_key",
                "become": {
                    "method": "sudo",
                    "user": "root",
                    "pass": None,
                },
                "groups": ["group1", "group2"],
                "vars": {"sexy": "yes"},
            }, {
                "hostname": "testserver2",
                "ip": "8.8.8.8",
                "port": 2222,
                "username": "root",
                "password": "password",
                "private_key": "/tmp/private_key",
                "become": {
                    "method": "su",
                    "user": "root",
                    "pass": "123",
                },
                "groups": ["group3", "group4"],
                "vars": {"love": "yes"},
            }]
    
            inventory = BaseInventory(host_list=host_list)
    
            print("#"*10 + "Hosts" + "#"*10)
            for host in inventory.hosts:
                print(host)
    
            print("#" * 10 + "Groups" + "#" * 10)
            for group in inventory.groups:
                print(group)
    
            print("#" * 10 + "all group hosts" + "#" * 10)
            group = inventory.get_group('all')
            print(group.hosts)
    
    if __name__ == '__main__':
         Test()

关键字