跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow Operator模板
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow Operator模板 = == 介绍 == '''Airflow Operator模板'''是Apache Airflow中用于简化Operator创建和重用的核心机制。它通过预定义参数化模板字段(称为'''模板化字段''')允许动态注入上下文变量(如执行日期、任务实例等),避免硬编码并增强工作流的灵活性。Operator模板基于Jinja2模板引擎实现,支持在任务运行时解析变量。 == 核心原理 == Airflow的模板系统在Operator基类中定义,通过`template_fields`属性指定哪些参数可被模板化。当DAG解析时,Airflow会将模板字段的值替换为实际上下文变量。 === 模板化字段示例 === 以下代码展示了一个自定义Operator的模板字段定义: <syntaxhighlight lang="python"> from airflow.models import BaseOperator class MyCustomOperator(BaseOperator): template_fields = ('query', 'output_path') # 声明可模板化的参数 def __init__(self, query, output_path, **kwargs): super().__init__(**kwargs) self.query = query self.output_path = output_path def execute(self, context): rendered_query = self.query # 运行时自动渲染 print(f"Executing: {rendered_query}") </syntaxhighlight> == 内置模板变量 == Airflow提供以下常用上下文变量(通过`context`字典访问): * <code>ds</code>: 执行日期(YYYY-MM-DD) * <code>execution_date</code>: 逻辑日期时间戳 * <code>task_instance</code>: 当前任务实例对象 * <code>macros</code>: 包含宏如<code>datetime</code> == 代码示例 == === BashOperator模板 === <syntaxhighlight lang="python"> from airflow.operators.bash import BashOperator bash_task = BashOperator( task_id="process_data", bash_command="echo 'Processing data for {{ ds }}' > /tmp/output_{{ ds_nodash }}.txt", dag=dag ) </syntaxhighlight> '''输出效果''': 若执行日期为2023-01-01,生成文件`/tmp/output_20230101.txt`,内容为"Processing data for 2023-01-01"。 === PythonOperator模板 === <syntaxhighlight lang="python"> from airflow.operators.python import PythonOperator def process_data(**context): date = context['ds'] print(f"Data processed on: {date}") python_task = PythonOperator( task_id="python_process", python_callable=process_data, provide_context=True, dag=dag ) </syntaxhighlight> == 高级用法 == === 自定义宏 === 通过`user_defined_macros`在DAG中注入自定义变量: <syntaxhighlight lang="python"> dag = DAG( 'custom_macro_dag', user_defined_macros={'env': 'prod'} ) bash_task = BashOperator( task_id="env_check", bash_command="echo 'Running in {{ env }} environment'", dag=dag ) </syntaxhighlight> === 模板扩展 === 使用Jinja2控制结构实现条件逻辑: <syntaxhighlight lang="python"> BashOperator( task_id="conditional_task", bash_command=""" {% if macros.datetime.now().hour < 12 %} echo "Morning run" {% else %} echo "Afternoon run" {% endif %} """, dag=dag ) </syntaxhighlight> == 实际案例 == === 动态文件处理 === 处理按日期分区的数据文件: <syntaxhighlight lang="python"> BashOperator( task_id="load_daily_data", bash_command=""" aws s3 cp s3://bucket/raw/{{ ds }}/data.csv /tmp/ && python transform.py --input=/tmp/data.csv --output=/tmp/processed_{{ ds_nodash }}.parquet """, dag=dag ) </syntaxhighlight> === 跨任务参数传递 === 利用XCom传递模板变量: <syntaxhighlight lang="python"> def generate_filename(**context): context['ti'].xcom_push(key='filename', value=f"report_{context['ds']}.pdf") PythonOperator( task_id="generate_filename", python_callable=generate_filename, dag=dag ) BashOperator( task_id="render_report", bash_command="wkhtmltopdf http://reports.example.com?date={{ ds }} {{ ti.xcom_pull(task_ids='generate_filename', key='filename') }}", dag=dag ) </syntaxhighlight> == 模板调试 == 使用`airflow tasks render`命令预览渲染结果: <pre> airflow tasks render [dag_id] [task_id] [execution_date] </pre> == 最佳实践 == * 将动态路径、日期等参数声明为`template_fields` * 避免在模板中编写复杂业务逻辑 * 对敏感数据使用`template_ext`或Airflow加密变量 * 通过单元测试验证模板渲染 == 限制与注意事项 == 1. 仅`template_fields`列出的参数支持模板化 2. 模板渲染发生在Worker节点而非调度器 3. 错误语法会导致任务失败(如未闭合的Jinja2标签) == 参见 == * [[Apache Airflow官方文档 | Airflow Templating]] * [[Jinja2模板引擎]] * [[Airflow宏参考]] [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Operators详解]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)