Airflow EmailOperator 详解
Airflow EmailOperator 详解[编辑 | 编辑源代码]
简介[编辑 | 编辑源代码]
EmailOperator 是 Apache Airflow 的核心操作器之一,用于在任务执行过程中发送电子邮件。该操作器通过 SMTP 协议与邮件服务器通信,支持发送纯文本或 HTML 格式的邮件,并允许动态生成邮件内容(如任务执行结果通知、警报等)。它是工作流自动化中通知机制的关键组件。
核心参数[编辑 | 编辑源代码]
以下是 EmailOperator 的主要配置参数:
- to(必填):收件人邮箱地址列表,如
['user1@example.com', 'user2@example.com']
- subject(必填):邮件主题,支持 Jinja2 模板
- html_content:HTML 格式的邮件正文(与
content
二选一) - content:纯文本格式的邮件正文
- cc:抄送列表
- bcc:密送列表
- conn_id:SMTP 服务器连接配置的 ID(默认为
smtp_default
)
基础示例[编辑 | 编辑源代码]
以下示例展示如何通过 EmailOperator 发送纯文本邮件:
from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime
with DAG(
dag_id="email_notification_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None
) as dag:
send_email = EmailOperator(
task_id="send_email",
to="recipient@example.com",
subject="Airflow 任务通知 - {{ ds }}",
html_content="<h3>任务执行成功</h3><p>DAG {{ dag.dag_id }} 于 {{ ts }} 完成。</p>",
)
输出说明[编辑 | 编辑源代码]
当任务执行时:
1. 主题中的 模板:Ds
会被替换为当前执行日期(如 2023-01-01
)
2. 正文中的 模板:Ts
会被替换为时间戳(如 2023-01-01T00:00:00+00:00
)
高级用法[编辑 | 编辑源代码]
动态内容生成[编辑 | 编辑源代码]
通过 Python 函数生成邮件内容,结合 PythonOperator
:
def generate_email_content(**context):
task_status = context['ti'].state
return f"""
任务状态: {task_status}
执行时间: {context['ts']}
DAG 信息: {context['dag'].dag_id}
"""
with DAG(...) as dag:
email_content = PythonOperator(
task_id="generate_content",
python_callable=generate_email_content,
provide_context=True,
)
send_email = EmailOperator(
task_id="send_email",
to="admin@example.com",
subject="DAG 执行报告 - {{ ds }}",
html_content="{{ task_instance.xcom_pull(task_ids='generate_content') }}",
)
email_content >> send_email
附件支持[编辑 | 编辑源代码]
通过继承 EmailOperator 实现附件发送(需自定义 operator):
from airflow.operators.email import EmailOperator
from email.mime.application import MIMEApplication
class CustomEmailOperator(EmailOperator):
def __init__(self, attachments=None, **kwargs):
super().__init__(**kwargs)
self.attachments = attachments or []
def execute(self, context):
# 基础邮件构建逻辑
mail = super().execute(context)
# 添加附件
for filepath in self.attachments:
with open(filepath, "rb") as f:
part = MIMEApplication(f.read())
part.add_header('Content-Disposition', 'attachment', filename=filepath)
mail.attach(part)
return mail
实际案例[编辑 | 编辑源代码]
场景:ETL 任务失败通知[编辑 | 编辑源代码]
当数据管道任务失败时,自动发送包含错误日志的邮件:
对应 DAG 实现:
from airflow.operators.python import PythonOperator
def load_data(**context):
try:
# 数据加载逻辑
pass
except Exception as e:
error_msg = str(e)
context['ti'].xcom_push(key='error', value=error_msg)
raise
with DAG(...) as dag:
load_task = PythonOperator(
task_id="load_data",
python_callable=load_data,
provide_context=True,
)
alert_email = EmailOperator(
task_id="send_alert",
to="data_team@example.com",
subject="ETL 失败警报 - {{ ds }}",
html_content="""
<p>加载任务失败!</p>
<p>错误信息: {{ task_instance.xcom_pull(task_ids='load_data', key='error') }}</p>
""",
trigger_rule="one_failed",
)
load_task >> alert_email
配置 SMTP 连接[编辑 | 编辑源代码]
在 Airflow 的 airflow.cfg
中配置 SMTP 或通过 Admin -> Connections 添加:
- Connection ID:
smtp_default
- Connection Type: SMTP
- Host:
smtp.example.com
- Port: 587 (或 465 for SSL)
- Login: 邮箱用户名
- Password: 邮箱密码
- Extra:
{"starttls": true, "ssl": false}
最佳实践[编辑 | 编辑源代码]
1. 敏感信息处理:避免在邮件内容中硬编码密码,使用 Airflow 的 Variables 或 Secrets Backend
2. 模板优化:利用 Jinja2 模板动态生成内容,如 模板:Macros.datetime.now()
3. 错误处理:设置 trigger_rule
控制邮件发送条件(如 one_failed
)
4. 性能考虑:大量邮件发送建议使用专用服务(如 SendGrid/SES 的 custom operator)
数学公式示例(可选)[编辑 | 编辑源代码]
当需要计算邮件发送成功率时:
总结[编辑 | 编辑源代码]
EmailOperator 是 Airflow 通知系统的核心组件,通过灵活的模板和集成能力,可实现:
- 任务状态通知
- 警报系统
- 报告分发
- 自定义工作流交互
掌握其高级用法(如动态内容、附件、错误处理)可显著提升运维自动化效率。