Wir werden Ihnen die neuen Funktionen von KubernetesExecutor 2.0 vorstellen. Spoiler Alarm !!! Der Prozess ist schneller, flexibler und verständlicher.
Zusammen mit Airflow 2.0 freuen wir uns, Ihnen einen komplett überarbeiteten KubernetesExecutor vorstellen zu können. Diese neue Architektur ist schneller, flexibler und verständlicher als KubernetesExecutor 1.10. Als ersten Schritt möchten wir Ihnen die neuen Funktionen von KubernetesExecutor 2.0 vorstellen!
Was ist KubernetesExecutor?
Im Jahr 2018 haben wir den KubernetesExecutor eingeführt, der auf den Ideen der automatischen Skalierung und Flexibilität basiert. Airflow hatte noch kein klares Konzept für die automatische Skalierung von Sellerie-Arbeitern (obwohl unsere jüngste Zusammenarbeit mit KEDA in dieser Hinsicht sehr erfolgreich war), daher wollten wir ein System schaffen, das die Bedürfnisse des Benutzers erfüllen kann. Als Ergebnis dieser Forschung wurde ein System erstellt, das die Kubernetes-API verwendet, um eine Pod-Aufgabe pro Luftstrom auszuführen. Ein wertvoller Nebeneffekt dieses Kubernetes API-basierten Systems besteht darin, dass Benutzer die Möglichkeit haben, für jede Aufgabe eindeutige Add-Ons und Einschränkungen hinzuzufügen.
Mithilfe der Kubernetes-API und von KubernetesExecutor können Airflow-Benutzer feststellen, dass bestimmte Aufgaben Zugriff auf bestimmte Geheimnisse haben oder dass eine Aufgabe nur auf einem Knoten ausgeführt werden kann, der in der Europäischen Union vorhanden ist (was für die Datenverwaltung nützlich sein kann). Benutzer können auch angeben, wie viele Ressourcen eine Aufgabe belegt. Dies kann je nach Ausführung der Aufgabe sehr unterschiedlich sein (zum Beispiel ist der Zugriff auf GPUs erforderlich, um ein TensorFlow-Skript auszuführen). Mit dieser API können Dateningenieure mit dem KubernetesExecutor viel genauer steuern, wie Airflow seine Aufgaben ausführt, als wenn sie nur vorhandene Sellerie-Warteschlangen verwenden würden.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.

, , , . Pod , . .
.

. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
Das Beste an diesen drei neuen Funktionen ist, dass sie alle in Airflow 1.10.13 verfügbar sind. Sie können den Migrationsprozess sofort starten und die Vorteile und die Beschleunigung dieses einfacheren Designs nutzen. Wir freuen uns auf Ihr Feedback und können uns bei Fragen, Funktionswünschen oder Dokumentationen gerne kontaktieren!