跳转到内容

Airflow EmailOperator 详解

来自代码酷

Airflow EmailOperator 详解[编辑 | 编辑源代码]

简介[编辑 | 编辑源代码]

EmailOperator 是 Apache Airflow 的核心操作器之一,用于在任务执行过程中发送电子邮件。该操作器通过 SMTP 协议与邮件服务器通信,支持发送纯文本或 HTML 格式的邮件,并允许动态生成邮件内容(如任务执行结果通知、警报等)。它是工作流自动化中通知机制的关键组件。

核心参数[编辑 | 编辑源代码]

以下是 EmailOperator 的主要配置参数:

  • to(必填):收件人邮箱地址列表,如 ['user1@example.com', 'user2@example.com']
  • subject(必填):邮件主题,支持 Jinja2 模板
  • html_content:HTML 格式的邮件正文(与 content 二选一)
  • content:纯文本格式的邮件正文
  • cc:抄送列表
  • bcc:密送列表
  • conn_id:SMTP 服务器连接配置的 ID(默认为 smtp_default

基础示例[编辑 | 编辑源代码]

以下示例展示如何通过 EmailOperator 发送纯文本邮件:

from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime

with DAG(
    dag_id="email_notification_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
) as dag:
    send_email = EmailOperator(
        task_id="send_email",
        to="recipient@example.com",
        subject="Airflow 任务通知 - {{ ds }}",
        html_content="<h3>任务执行成功</h3><p>DAG {{ dag.dag_id }} 于 {{ ts }} 完成。</p>",
    )

输出说明[编辑 | 编辑源代码]

当任务执行时: 1. 主题中的 模板:Ds 会被替换为当前执行日期(如 2023-01-01) 2. 正文中的 模板:Ts 会被替换为时间戳(如 2023-01-01T00:00:00+00:00

高级用法[编辑 | 编辑源代码]

动态内容生成[编辑 | 编辑源代码]

通过 Python 函数生成邮件内容,结合 PythonOperator

def generate_email_content(**context):
    task_status = context['ti'].state
    return f"""
    任务状态: {task_status}
    执行时间: {context['ts']}
    DAG 信息: {context['dag'].dag_id}
    """

with DAG(...) as dag:
    email_content = PythonOperator(
        task_id="generate_content",
        python_callable=generate_email_content,
        provide_context=True,
    )
    
    send_email = EmailOperator(
        task_id="send_email",
        to="admin@example.com",
        subject="DAG 执行报告 - {{ ds }}",
        html_content="{{ task_instance.xcom_pull(task_ids='generate_content') }}",
    )
    
    email_content >> send_email

附件支持[编辑 | 编辑源代码]

通过继承 EmailOperator 实现附件发送(需自定义 operator):

from airflow.operators.email import EmailOperator
from email.mime.application import MIMEApplication

class CustomEmailOperator(EmailOperator):
    def __init__(self, attachments=None, **kwargs):
        super().__init__(**kwargs)
        self.attachments = attachments or []

    def execute(self, context):
        # 基础邮件构建逻辑
        mail = super().execute(context)
        
        # 添加附件
        for filepath in self.attachments:
            with open(filepath, "rb") as f:
                part = MIMEApplication(f.read())
                part.add_header('Content-Disposition', 'attachment', filename=filepath)
                mail.attach(part)
        
        return mail

实际案例[编辑 | 编辑源代码]

场景:ETL 任务失败通知[编辑 | 编辑源代码]

当数据管道任务失败时,自动发送包含错误日志的邮件:

graph LR A[Extract Data] --> B[Transform Data] B --> C[Load Data] C --> D{Success?} D -->|No| E[Send Alert Email] D -->|Yes| F[End]

对应 DAG 实现:

from airflow.operators.python import PythonOperator

def load_data(**context):
    try:
        # 数据加载逻辑
        pass
    except Exception as e:
        error_msg = str(e)
        context['ti'].xcom_push(key='error', value=error_msg)
        raise

with DAG(...) as dag:
    load_task = PythonOperator(
        task_id="load_data",
        python_callable=load_data,
        provide_context=True,
    )
    
    alert_email = EmailOperator(
        task_id="send_alert",
        to="data_team@example.com",
        subject="ETL 失败警报 - {{ ds }}",
        html_content="""
        <p>加载任务失败!</p>
        <p>错误信息: {{ task_instance.xcom_pull(task_ids='load_data', key='error') }}</p>
        """,
        trigger_rule="one_failed",
    )
    
    load_task >> alert_email

配置 SMTP 连接[编辑 | 编辑源代码]

在 Airflow 的 airflow.cfg 中配置 SMTP 或通过 Admin -> Connections 添加:

  • Connection ID: smtp_default
  • Connection Type: SMTP
  • Host: smtp.example.com
  • Port: 587 (或 465 for SSL)
  • Login: 邮箱用户名
  • Password: 邮箱密码
  • Extra: {"starttls": true, "ssl": false}

最佳实践[编辑 | 编辑源代码]

1. 敏感信息处理:避免在邮件内容中硬编码密码,使用 Airflow 的 Variables 或 Secrets Backend 2. 模板优化:利用 Jinja2 模板动态生成内容,如 模板:Macros.datetime.now() 3. 错误处理:设置 trigger_rule 控制邮件发送条件(如 one_failed) 4. 性能考虑:大量邮件发送建议使用专用服务(如 SendGrid/SES 的 custom operator)

数学公式示例(可选)[编辑 | 编辑源代码]

当需要计算邮件发送成功率时: Psuccess=NdeliveredNtotal×100%

总结[编辑 | 编辑源代码]

EmailOperator 是 Airflow 通知系统的核心组件,通过灵活的模板和集成能力,可实现:

  • 任务状态通知
  • 警报系统
  • 报告分发
  • 自定义工作流交互

掌握其高级用法(如动态内容、附件、错误处理)可显著提升运维自动化效率。