跳转至

airflow Airflow 3.1.1

数据工程的救赎:为什么Airflow正在重新定义工作流自动化

在数据驱动的时代,每个技术团队都曾陷入这样的矛盾:我们渴望用自动化解放生产力,却总被错综复杂的任务依赖、脆弱的调度系统和深夜告警电话所束缚。直到那个深夜——当我们的数据管道再次崩溃,而团队成员在混乱的日志中徒劳搜寻时——我们遇见了改变游戏规则的解决方案。

为什么要使用Airflow

想象你是一位指挥家,正试图让一支由数百种数据任务组成的交响乐团和谐演奏。传统脚本就像乐手各自为政:一个任务延迟会导致整个演出崩溃,错误排查如同在黑暗房间寻找针尖。这就是数据工程师的日常噩梦——直到Airflow带来曙光。

它解决了现代数据生态中最尖锐的矛盾:我们既需要灵活定制工作流,又渴望获得企业级可靠性;既希望快速迭代,又要求可视化监控和智能重试机制。当你的数据管道从「凌晨三点呼叫救火」变成「优雅的自我修复系统」,你会理解为什么包括 Airbnb 和 Netflix 在内的顶尖团队都选择用它来驾驭数据洪流。

Airflow是什么

Apache Airflow 是一个用代码定义、调度和监控工作流的开源平台。想象它如同智能交通管制系统:通过有向无环图(DAG)精准协调每个任务的执行顺序,内置调度器自动触发流程,Web界面实时展示运行状态——让复杂的数据管道变得像搭积木般直观可控。

入门示例

让我们见证一个真实场景:电商公司的每日销售报表管道。每天凌晨,它需要同步订单数据、计算销售额、生成可视化报表并发送邮件通知。

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

def extract_orders():
    # 从数据库提取最新订单
    return ["order1", "order2"]

def calculate_revenue(orders):
    # 计算每日销售额
    return len(orders) * 100

def send_report(revenue):
    print(f"今日销售额:{revenue}元")

with DAG('daily_sales', start_date=datetime(2024,1,1), schedule='@daily') as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_orders)
    transform = PythonOperator(task_id='calculate', python_callable=calculate_revenue, op_args=[extract.output])
    load = PythonOperator(task_id='report', python_callable=send_report, op_args=[transform.output])

    extract >> transform >> load  # 定义执行顺序

这个生动示例展示了如何用寥寥数行代码构建健壮的数据管道——任务自动按序执行,失败时智能重试,并通过清晰的可视化界面监控进度。

Airflow 3.1.1版本更新了什么

本次更新集中修复了多个核心组件稳定性问题:解决从旧版本升级时的配置迁移错误,修补远程日志连接内存泄漏,优化调度器在邮件通知场景的崩溃缺陷。同时增强网格视图查询性能,改进任务重试机制可靠性,并新增希腊语/泰语等多语言界面支持,显著提升大规模部署的运维体验。

更新日志

📦 PyPI: https://pypi.org/project/apache-airflow/3.1.1/
📚 文档: https://airflow.apache.org/docs/apache-airflow/3.1.1/
🛠️ release_notes.html
🪶 源代码: https://airflow.apache.org/docs/apache-airflow/3.1.1/installation/installing-from-sources.html
🐳 Docker镜像: "docker pull apache/airflow:3.1.1"
🚏 约束文件: https://github.com/apache/airflow/tree/constraints-3.1.1

Bug修复

  • 修复早期版本升级过程中因空值dag_run.conf导致的执行失败
  • 修复远程日志连接缓存的内存泄漏问题
  • 修复启用预导入模块优化时的DAG处理器崩溃
  • 修复电子邮件通知导致的调度器崩溃
  • 修复3.0到3.1迁移过程中retry_delay为None时的调度器崩溃
  • 修复延迟状态后任务重试执行错误方法的问题
  • 修复外部终止任务的重试回调未执行问题
  • 修复手动触发时自定义时间表generate_run_id未调用问题
  • 修复访问未显式设置retry_delay的MappedOperator时出现KeyError
  • 修复task-sdk连接错误处理与airflow-core行为不匹配
  • 修复网格视图的拓扑排序问题
  • 修复回调请求中get_ti_count和get_task_states的访问
  • 修复Server上下文中Connection或Variable的访问
  • 修复.airflowignore顺序优先级
  • 修复Pydantic 2.12.0兼容性的迁移错误
  • 修复airflow dags backfill CLI中--dag_run_conf的JSON解析
  • 修复备注模态框更改后不更新Markdown文本的UI问题
  • 修复任务被清除后网格视图显示异常
  • 修复浅色模式下的日志文本选择对比度
  • 修复DAG列表视图中高级搜索按钮重叠
  • 修复多DAG标签的显示视图
  • 修复DAG列表视图中资源名称文本溢出
  • 修复仅单个DAG运行时的自动刷新
  • 修复无权限时UI持续请求池API的问题
  • 修复任务日志视图中的多行拖拽选择
  • 修复任务名为'root'时悬停导致蓝屏
  • 修复日-月和周-日冲突时的cron表达式显示
  • 通过SerializedDagModel查询优化修复网格视图性能问题
  • 优雅处理空url_prefix的FastAPI插件
  • 允许映射任务在重运行时接受零长度输入
  • 支持通过指定映射索引清除任务实例的API
  • 向MappedOperator模型添加max_retry_delay
  • 修复使用@asset装饰器获取资产时未使用传递的名称
  • 添加英语作为回退语言环境

功能增强

  • 新增希腊语UI翻译
  • 新增泰语UI翻译
  • 新增波兰语翻译
  • 完善德语翻译实现完整UI覆盖
  • 修复希伯来语翻译拼写错误
  • 改进回调中DAG和任务