7.13 基于 ansible-api 二次开发

image0

长久以来,IT 运维在企业内部一直是个耗人耗力的事情。随着虚拟化的大量应用、私有云、容器的不断普及,数据中心内部的压力愈发增加。传统的自动化工具,往往是面向于数据中心特定的一类对象,例如操作系统、虚拟化、网络设备的自动化运维工具往往是不同的。那么,有没有一种数据中心级别的统一的自动化运维工具呢?

答案就是Ansible。和传统的自动化工具 (如 Puppet)相比,Ansible 尤其明显的优势:

  • 简单,是一种高级的脚本类语言,而非标准语言。

  • 不需要安装 agent, 分为管理节点和远程被管节点,通过 SSH 认证。

  • 纳管范围广泛,不仅仅是操作系统,还包括各种虚拟化、公有云,甚至网络设备。

用过 ansible 的人都知道,ansible 一般都是通过命令行执行的:ansible、ansible-playbook、ansible-doc、ansible-galaxy、ansible-console 等。用执行命令的方式,整个过程的所有信息都会实时的打印在终端屏幕。这样的好处是,反馈实时,可以观察整个部署过程。缺点就是信息过多,难以获取有效的信息。

这时候,我们自然想到可以直接调用 ansible 提供的接口进行开发。

通过搜索引擎查询有关 ansible 二次开发的相关资料,发现这方面的文章非常少,而且由于 ansible 一直在进行代码的重构优化,有很多网友提供的 demo 已不适用我当前使用的版本,一直跑不通。

本想从官方文档中查得一些资料,结果发现官网中对于这块也没有很详细的介绍。

后来想想,ansible、ansible-playbook、ansible-doc 这些命令不也是基于 ansible 的 api 开发出来的吗?

我不是可以模仿他们,来调用 ansible api 接口。

以下通过阅读 ansible cli 的源码,总结出来的。

如何调用 Playbook

我习惯性地将调用 playbook 的代码封装为一个函数。

def playbook_task():
    try:
        # 执行 playbook 的各项参数
        cli = PlaybookCLI(args=["", "you_task.yml"])
        cli.parse()
        super(PlaybookCLI, cli).run()

        loader, inventory, variable_manager = cli._play_prereqs(cli.options)
        CLI.get_host_list(inventory, cli.options.subset)
        pbex = PlaybookExecutor(playbooks=cli.args, inventory=inventory,
                                variable_manager=variable_manager,
                                loader=loader,
                                options=cli.options,
                                passwords=None)
        # 【重点】指定回调实例
        pbex._tqm._stdout_callback = ResultCallback()
        exit_code = pbex.run()

    except Exception as e:
        LOG.exception('Failed to run playbook:{}'.format(e))

以上的代码相对固定,你可以直接拷贝直接用。除了两个位置你要注意一下

  1. 第一个注释处:按需要对应添加 playbook 的参数

  2. 第二个注释处:指定你自定义的回调实例

定义回调类

from ansible.plugins.callback.default import CallbackModule

class ResultCallback(CallbackModule):
    def __init__(self):
        self.display_failed_stderr = False
        self.display_ok_hosts = True
        super(ResultCallback, self).__init__()

    def v2_runner_on_failed(self, result, ignore_errors=False, **kwargs):
        super(ResultCallback, self).v2_runner_on_failed(result, ignore_errors, **kwargs)
        host = result._host.get_name()
        self.runner_on_failed(host, result._result, ignore_errors)
        if not ignore_errors:
            if "ws" not in host:
                host = "ws_" + host.replace("_host", "")

            if host in task_result:
                task_result[host]["detailStatus"] = "FAIL"
                task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")
            else:
                for host, _ in task_result.items():
                    task_result[host]["detailStatus"] = "FAIL"
                    task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")

    def v2_runner_on_unreachable(self, result):
        super(ResultCallback, self).v2_runner_on_unreachable(result)
        host = result._host.get_name()
        self.runner_on_unreachable(host, result._result)
        if "ws" not in host:
            host = "ws_" + host.replace("_host", "")
        task_result[host]["detailStatus"] = "FAIL"
        task_result[host]["detailMsg"] = result.task_name + ": " + result._result.get("msg", "").encode("utf-8")

    def v2_runner_on_skipped(self, result):
        super(ResultCallback, self).v2_runner_on_skipped(result)
        if C.DISPLAY_SKIPPED_HOSTS:
            host = result._host.get_name()
            self.runner_on_skipped(host, self._get_item(getattr(result._result, 'results', {})))

并实现其中几个关键的方法

  • v2_runner_on_ok:单个部署任务成功时,会调用

  • v2_runner_on_failed:节点的部署任务失败时,会调用

  • v2_runner_on_unreachable:节点不可达时,会调用

  • v2_runner_on_skipped:部署任务跳过时,会调用

  • v2_playbook_on_stats:所有的部署任务完成时,调用

image1