Airflow w/ kubernetes executor + minikube + helm · GitHub

您所在的位置:网站首页 airflowcontribexecutorskubernetes Airflow w/ kubernetes executor + minikube + helm · GitHub

Airflow w/ kubernetes executor + minikube + helm · GitHub

2023-03-09 10:21| 来源: 网络整理| 查看: 265

Overview

The steps below bootstrap an instance of airflow, configured to use the kubernetes airflow executor, working within a minikube cluster.

This guide works with the airflow 1.10 release, however will likely break or have unnecessary extra steps in future releases (based on recent changes to the k8s related files in the airflow source).

Prerequisites Docker installed Minikube installed and started Helm installed and initialized in the minikube instance Build k8s enabled airflow docker image

Clone the docker-airflow repo git clone [email protected]:puckel/docker-airflow.git

Checkout the 1.10.0-5 release tag git checkout 1.10.0-5

Edit the Dockerfile and add a line in the RUN command to install the kubernetes python package && pip install 'kubernetes' \

Configure docker to execute within the minikube VM eval (minikube docker-env)

Build the and tag the image within the minikube VM docker build -t airflow-docker-local:1

Install the helm airflow chart

Clone the chart repo [email protected]:kppullin/charts.git

Checkout the airflow branch git checkout feature/airflow-minikube

Create a airflow.yaml helm config file:

airflow: image: repository: airflow-docker-local tag: 1 executor: Kubernetes service: type: LoadBalancer config: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-docker-local AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: 1 AIRFLOW__KUBERNETES__WORKER_CONTAINER_IMAGE_PULL_POLICY: Never AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME: airflow AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM: airflow AIRFLOW__KUBERNETES__NAMESPACE: airflow AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@airflow-postgresql:5432/airflow persistence: enabled: true existingClaim: '' workers: enabled: false postgresql: enabled: true redis: enabled: false

Fetch the helm dependencies for the airflow chart helm dependency build ~/src/charts/incubator/airflow/

Copy the configmap-airflow-worker.yaml file attached below to ./charts/incubator/airflow/templates. This file sets the DB connection string (sql_alchemy_conn = postgresql+psycopg2://postgres:airflow@airflow-postgresql:5432/airflow). This is configurable via an enviroment variable in airflow's current master branch, but not in the 1.10 release.

Install the helm chart helm install --namespace "airflow" --name "airflow" -f airflow.yaml ~/src/charts/incubator/airflow/

Wait for the services to spin up kubectl get pods --watch -n airflow

Note: The various airflow containers will take a few minutes until their fully operable, even if the kubectl status is RUNNING. View the logs for the individual pods to know when they're up (kubectl logs -f )

Load the sample airflow DAG

Copy the sample airflow dag from https://www.techatbloomberg.com/blog/airflow-on-kubernetes/ into a file named k8s-sample.py

from airflow import DAG from datetime import datetime, timedelta from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.utcnow(), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10)) start = DummyOperator(task_id='run_this_first', dag=dag) passing = KubernetesPodOperator(namespace='airflow', image="python:3.6", cmds=["python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="passing-test", task_id="passing-task", get_logs=True, dag=dag, in_cluster=True ) failing = KubernetesPodOperator(namespace='airflow', image="ubuntu:16.04", cmds=["python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="fail", task_id="failing-task", get_logs=True, dag=dag, in_cluster=True ) passing.set_upstream(start) failing.set_upstream(start)

Copy the file into the airflow k8s persistent volume: kubectl get pods -n airflow -o jsonpath="{.items[0].metadata.name}" -l app=airflow-scheduler | xargs -I {} kubectl cp k8s-sample.py {}:/usr/local/airflow/dags -n airflow

Run the airflow job

Open the airflow web UI minikube service airflow-web -n airflow

Enable the DAG by clicking the toggle control to the on state

Click the trigger dag icon to run the job

Drill into the job and view the progress. It's also fun to see the jobs spin up with the watch command kubectl get pods --watch -n airflow



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3