大规模运行 Apache Airflow 的经验和教训

您所在的位置:网站首页 apache干什么的 大规模运行 Apache Airflow 的经验和教训

大规模运行 Apache Airflow 的经验和教训

2023-03-31 13:53| 来源: 网络整理| 查看: 265

作者|Sam Wheating Megan Parker

译者|Sambodhi

策划|罗燕珊

Apache Airflow 是一个能够开发、调度和监控工作流的编排平台。在 Shopify,我们已经在生产中运行了两年多的 Airflow,用于各种工作流,包括数据提取、机器学习模型训练、Apache Iceberg 表维护和 DBT 驱动的数据建模。在撰写本文时,我们正通过 Celery 执行器和 MySQL 8 在 Kubernetes 上来运行 Airflow 2.2。

Shopify 在 Airflow 上的应用规模在过去两年中急剧扩大。在我们最大的应用场景中,我们使用了 10000 多个 DAG,代表了大量不同的工作负载。在这个场景中,平均有 400 多项任务正在进行,并且每天的运行次数超过 14 万次。由于 Shopify 的内部采用率越来越高,我们的 Airflow 部署将会产生更多的负载。因为这样的迅速增长,我们所面临的困难包括:文件存取速度太慢、对 DAG(Directed acyclic graph,有向无环图)能力的控制不足、流量水平的不规则、工作负载之间的资源争用等等。

接下来,我们将与大家分享我们所获得的经验以及我们为实现大规模运行 Airflow 而构建的解决方案。

使用云端存储时,文件存取速度可能会变慢

对于 Airflow 环境的性能和完整性,快速的文件存取速度至关重要。一个清晰的文件存取策略可以保证调度器能够迅速地对 DAG 文件进行处理,并且让你的作业保持更新。

通过重复扫描和重新解析配置的 DAG 目录中的所有文件,可以保持其工作流的内部表示最新。这些文件必须经常扫描,以保持每个工作负载的磁盘数据源和其数据库内部表示之间的一致性。这就意味着 DAG 目录的内容必须在单一环境中的所有调度器和工作器之间保持一致(Airflow 提供了几种方法来实现这一目标)。

在 Shopify 中,我们利用谷歌云存储(Google Cloud Storage,GCS)来存储 DAG。我们最初部署 Airflow 时,利用 GCSFuse 在单一的 Airflow 环境中的所有工作器和调度器来维护一致的文件集。然而,在规模上,这被证明是一个性能瓶颈,因为每个文件的存取都会引起对 GCS 的请求。由于在环境中的每一个 pod 都需要单独挂在桶,所以存取量特别大。

经过几次试验,我们发现,在 Kubernetes 集群上运行一个 NFS(Network file system,网络文件系统)服务器,可以大大改善 Airflow 环境的性能。然后,我们把 NFS 服务器当作一个多读多写的卷转进工作器和调度器的 pod 中。我们编写了一个自定义脚本,使该卷的状态与 GCS 同步,因此,当 DAG 被上传或者管理时,用户可以与 GCS 进行交互。这个脚本在同一个集群内的单独 pod 中运行。这使得我们可以有条件地在给定的桶中仅同步 DAG 的子集,或者根据环境的配置,将多个桶中的 DAG 同步到一个文件系统中(稍后会详细阐述)。

总而言之,这为我们提供了快速的文件存取作为一个稳定的外部数据源,同时保持了我们快速添加或修改 Airflow 中 DAG 文件的能力。另外,我们还可以利用谷歌云平台的 IAM(识别和存取管理)功能来控制哪些用户能够上传文件到特定的环境。例如,我们可以让用户直接将 DAG 直接上传到 staging 环境,但将生产环境的上传限制在我们的持续部署过程中。

在大规模运行 Airflow 时,确保快速文件存取的另一个考虑因素是你的文件处理性能。Airflow 具有高度的可配置性,可以通过多种方法调整后台文件处理(例如排序模式、并行性和超时)。这使得你可以根据需求优化环境,以实现交互式 DAG 开发或调度器性能。

元数据数量的增加,可能会降低 Airflow 运行效率

在一个正常规模的 Airflow 部署中,由于元数据的数量而造成的性能降低并不是问题,至少在最初的几年里是这样。

但是,从规模上看,元数据正在迅速地累积。一段时间之后,就可能开始对数据库产生额外的负载。这一点在 Web 用户界面的加载时间上就可以看得出来,尤其是 Airflow 的更新,在这段时间里,迁移可能要花费数小时。

经过反复试验,我们确定了 28 天的元数据保存策略,并实施了一个简单的 DAG,在 PythonOperator 中利用 ORM(对象关系映射)查询,从任何包含历史数据(DagRuns、TaskInstances、Logs、TaskRetries 等)的表中删除行。我们之所以选择 28 天,是因为它可以让我们有充足的历史记录来管理事件和跟踪历史工作绩效,同时将数据库中的数据量保持在合理的水平。

db_clean.py:

import loggingfrom datetime import datetime, timezone, timedelta from sqlalchemy import deletefrom airflow.models import DAG, Log, DagRun, TaskInstance, TaskReschedule, Variablefrom airflow.jobs.base_job import BaseJobfrom airflow.utils.dates import days_agofrom airflow.operators.python import PythonOperatorfrom airflow.utils.state import Statefrom airflow.utils.session import provide_session EXPIRATION_WEEKS = 4 @provide_sessiondef delete_old_database_entries_by_model(table, date_col, session=None): expiration_date = datetime.now(timezone.utc) - timedelta(weeks=EXPIRATION_WEEKS) query = delete(table).where(date_col < expiration_date) if "state" in dir(table): query = query.where(State.RUNNING != "state") result = session.execute(query) logging.info( "Deleted %s rows from the database for the %s table that are older than %s.", result.rowcount, table, expiration_date, ) def delete_old_database_entries(): if Variable.get("ENABLE_DB_TRUNCATION", "") != "True": logging.warning("This DAG will delete all data older than %s weeks.", EXPIRATION_WEEKS) logging.warning("To enable this, create an Airflow Variable called ENABLE_DB_TRUNCATION set to 'True'") logging.warning("Skipping truncation until explicitly enabled.") return delete_old_database_entries_by_model(TaskInstance, TaskInstance.end_date) delete_old_database_entries_by_model(DagRun, DagRun.end_date) delete_old_database_entries_by_model(BaseJob, BaseJob.end_date) delete_old_database_entries_by_model(Log, Log.dttm) delete_old_database_entries_by_model(TaskReschedule, TaskReschedule.end_date) dag = DAG( "airflow-utils.truncate-database", start_date=days_ago(1), max_active_runs=1, dagrun_timeout=timedelta(minutes=20), schedule_interval="@daily", catchup=False,) PythonOperator( task_id="cleanup-old-database-entries", dag=dag, python_callable=delete_old_database_entries,)

遗憾的是,这就意味着,在我们的环境中,Airflow 中的那些依赖于持久作业历史的特性(例如,长时间的回填)并不被支持。这对我们来说并不是一个问题,但是它有可能会导致问题,这要取决于你的保存期和 Airflow 的使用情况。

作为自定义 DAG 的另一种方法,Airflow 最近增加了对 db clean 命令的支持,可以用来删除旧的元数据。这个命令在 Airflow 2.3 版本中可用。

DAG 可能很难与用户和团队关联

在多租户环境中运行 Airflow 时(尤其是在大型组织中),能够将 DAG 追溯到个人或团队是很重要的。为什么?因为如果一个作业失败了,抛出错误或干扰其他工作负载,我们的管理员可以迅速联系到合适的用户。

如果所有的 DAG 都直接从一个仓库部署,我们可以简单地使用 git blame 来追踪工作的所有者。然而,由于我们允许用户从自己的项目中部署工作负载(甚至在部署时动态生成作业),这就变得更加困难。

为了方便追踪 DAG 的来源,我们引入了一个 Airflow 命名空间的注册表,并将其称为 Airflow 环境的清单文件。

sample_airflow_manifest.yaml:

projects: defaults: &defaults source_repository: 'https://github.com/my_organization/dag_repo' dag_source_bucket: 'my_organization_dags' constraints: &constraints airflow_celery_queues: - 'default' pools: - 'default' data_extracts:


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3