0
点赞
收藏
分享

微信扫一扫

0613-Airflow集成自动生成DAG插件

作者:李继武


1

文档编写目的


Airflow的DAG是通过python脚本来定义的,原生的Airflow无法通过UI界面来编辑DAG文件,这里介绍一个插件,通过该插件可在UI界面上通过拖放的方式设计工作流,最后自动生成DAG定义文件。


  • 内容概述

1. Airflow插件集成

2. 使用介绍

3. 总结


  • 安装环境

1. RedHat7.4

2. Python2.7

3. Airflow1.10.1


2

集成DAG生成插件


1. 在github上下载该插件并上传到服务器上并解压,github地址为:


https://github.com/lattebank/airflow-dag-creation-manager-plugin


0613-Airflow集成自动生成DAG插件_python


2. 在AIRFLOW_HOME目录下创建plugins目录,复制插件文件到该目录下,执行以下命令:


mkdir -p /opt/airflow/plugins
cp -r airflow-dag-creation-manager-plugin-master/plugins/* /opt/airflow/plugins/


3. 修改配置文件airflow.cfg,在最后添加如下配置


[dag_creation_manager]
# DEFAULT: basis
dag_creation_manager_line_interpolate = basis
# Choices for queue and pool
dag_creation_manager_queue_pool = mydefault:mydefault|mydefault
# MR queue for queue pool
dag_creation_manager_queue_pool_mr_queue = mydefault:mydefault
# Category for display
dag_creation_manager_category = custom
# Task category for display
dag_creation_manager_task_category = custom_task:#ffba40
# Your email address to receive email
# DEFAULT:
dag_creation_manager_default_email = lijiwu@macro-china.net
dag_creation_manager_need_approver = False
dag_creation_manager_can_approve_self = True
dag_creation_manager_dag_templates_dir = /opt/airflow/plugins/dcmp/dag_templates


0613-Airflow集成自动生成DAG插件_github_02


4. 关闭Airflow


pkill airflow


5. 因为该插件还集成了安全认证,但使用的flask-login模块与当前的airflow自动下载的模块版本不匹配,先卸载原来的flask-login


pip uninstall flask-login


上传Flask-Login-0.2.11.tar.gz并解压


0613-Airflow集成自动生成DAG插件_python_03


安装


cd Flask-Login-0.2.11
python setup.py install


0613-Airflow集成自动生成DAG插件_github_04


该插件启用之后,许多功能会被屏蔽掉,此处不开启,如果需要开启在Airflow.cfg中的[webserver]配置:


authenticate = True
auth_backend = dcmp.auth.backends.password_auth


6. 执行如下命令更新数据库


python /opt/airflow/plugins/dcmp/tools/upgradedb.py


7. 启动airflow


8. 该插件生成的DAG都需要指定一个POOL来执行任务,根据我们在DAG中配置的POOL来创建POOL:


0613-Airflow集成自动生成DAG插件_python_05


打开UI界面,选择“Admin”下的“Pools”


0613-Airflow集成自动生成DAG插件_python_06


选择“create”进行创建:


0613-Airflow集成自动生成DAG插件_flask_07

0613-Airflow集成自动生成DAG插件_python_08


3

使用介绍


1. 创建DAG,选择“Admin”下的“DAG Creation Manager”


0613-Airflow集成自动生成DAG插件_github_09


2. 点击“Create”


0613-Airflow集成自动生成DAG插件_python_10


3. 出现如下界面


0613-Airflow集成自动生成DAG插件_python_11


4. 下拉到底部,填写DAG相关配置,此处配置每分钟执行一次


0613-Airflow集成自动生成DAG插件_flask_12


5. 在DAG图中,选择添加“ADD TASK”,来添加一个节点


0613-Airflow集成自动生成DAG插件_github_13


6. 在下方填写该TASK的名称及脚本类型与脚本代码等信息,此处脚本内容为向/tmp/airflow.dat文件定时输入“*************************”:


0613-Airflow集成自动生成DAG插件_github_14


7. 再点击“ADD TASK”,将会在上面的“task1”节点后添加一个task,此处的规则是要在哪个task后添加一个任务,先点击该task,再点击“ADD  TASK”:

第二个TASK设为定期向上面的文件/tmp/airflow.dat中输入当前时间:


0613-Airflow集成自动生成DAG插件_flask_15


8. 再添加一个与task1同级的task,向/tmp/airflow.log定期输出当前时间:


0613-Airflow集成自动生成DAG插件_flask_16


9. 修改依赖,将task1和task3都作为task2的依赖:先点击task2,点击Change Upstream,选择task3。


0613-Airflow集成自动生成DAG插件_github_17


10. 点击保存


0613-Airflow集成自动生成DAG插件_github_18


11. 回到主界面之后,该DAG不会马上被识别出来,默认情况下Airflow是5分钟扫描一次dag目录,该配置可在airflow.cfg中修改。

识别出来之后打开主界面,点击“暂停按钮”取消暂停开始执行:


0613-Airflow集成自动生成DAG插件_flask_19


启动之后airflow仍会将之前积压的批次执行,终端上查看这两个文件


0613-Airflow集成自动生成DAG插件_github_20

0613-Airflow集成自动生成DAG插件_flask_21


4

总结


1. 该插件目前只适用于Python2,对于Python3的环境不适合。

举报

相关推荐

0 条评论