Apache Airflow ist der Standard für Workflow-Management Plattformen unter freier Lizenz und hält die Datentransformations- und Prozessketten vieler Unternehmen im Hintergrund zusammen. Die Entwicklung von Workflows erfordert zwar einerseits Programmierkenntnisse in Python, ist dadurch andererseits schnell zu erlernen und profitiert vom hohen Reifegrad der Werkzeuge und Prozesse, die in der Softwareentwicklung bereits seit Jahrzehnten praktiziert werden. Doch wo kann Ihr Team anfangen, wenn automatisierte Software Tests und Continous Integration/Continuous Deployment Prinzipien auf eine neue Airflow Systemumgebung angewendet werden soll? Wir haben einige Praxistipps gesammelt, die beim Einstieg oder dem Ausbau Ihrer Airflow Entwicklungsprozesse hilfreich sein können.
Versionskontrolle, Code Reviews, kurze Entwicklungszyklen, automatisiertes Testen. Diese Standardwerkzeuge werden in jedem Programmierkurs und in jeder Ausbildung vermittelt. Es sind Instrumente aus dem Software Engineering Handbuch, die eine hohe Qualität im entwickelten Softwareprodukt sicherstellen und zur ständigen Verbesserung beitragen sollen. Insbesondere frühes, automatisiertes Testen möglichst großer Teile des geschriebenen Quellcodes leisten hier einen großen Beitrag, robuste und hochqualitative Ergebnisse zu erzielen. Da Workflows bzw. DAGs (directed acyclic graphs) mit Apache Airflow ebenfalls in Code definiert werden, liegt der Anspruch nahe, auch diese möglichst umfangreich zu testen. Wird Airflow speziell für Data Pipelines genutzt, also Workflows, die den Datenfluss zwischen verschiedenen Systemen steuern, müssen diese besonders zuverlässig und nachweislich korrekt funktionieren.
Welche Möglichkeiten gibt es, um DAGs ausgiebig zu prüfen, bevor diese in der Produktivumgebung ausgeführt werden? Welche Ansätze bringen den schnellsten Mehrwert bei geringem Ressourceneinsatz? Wir beleuchten am Beispiel von GitLab CI/CD als Framework für die Testautomatisierung einige Ansätze.
Für unsere Beispiel nehmen wir an, dass Airflow nach DevOps Prinzipien betrieben wird und GitLab CI/CD als Automatisierungsframework für diese Prozesse des Systembetriebs eingesetzt wird. Einige unserer Empfehlungen hierfür haben wir bereits in einem vorherigen Artikel über Airflow CI/CD Pipelines in GitLab vorgestellt. Grundgedanke ist, sämtliche Parameter für den Systembetrieb in einem Versionskontrollsystem (GitLab) zu verwalten und bei relevanten Veränderungen die Betriebsumgebung automatisch zu aktualisieren (CI/CD).
Schema für die Entwicklung von Airflow DAGs mittels einer CI/CD Pipeline aus unserem Blog. Nur wenn alle Testblöcke erfolgreich durchlaufen werden,
kann ein DAG in die Produktivumgebung übernommen und ausgeführt werden.
Im Rahmen dieser Prozessautomatisierung sollen Tests integriert werden, die als Sollbruchstellen dienen und die Inbetriebnahme von fehlerhaften Zuständen unterbinden. Eine solche CI/CD Pipeline soll nicht ausschließlich die Betriebsparameter der Airflow-Plattform absichern, sondern auch die Geschäftslogik, welche in DAGs programmiert wird.
Ein DAG in Airflow definiert eine Abfolge von zunächst unabhängigen Aufgaben (Tasks), welche keine zyklischen Elemente enthalten kann. Jeder Task hat genau einen oder mehrere Nachfolger. Die Airflow Projektdokumentation gibt einige Hinweise, wie DAGs getestet werden können. Hinsichtlich der Geschäftslogik läuft es auf das Testen jedes einzelnen Tasks hinaus. Als Testframework für Python Quellcode wird üblicherweise die Bibliothek pytest genutzt.
pytest ermöglicht die Definition von Softwaretests, die einen Teil der Programmlogik ausführen und den Erfolg der Ausführung über frei wählbare Zieldimensionen (Assertions) prüfen. Wenn eine Funktion bei bekannten Eingabewerten ein immer gleiches Ergebnis liefern muss, kann dies anhand einer Assertion geprüft werden. Soll die Funktion bei ungültigen Eingabewerten eine bestimmte Fehlermeldung ausgeben, wird dies mit einer weiteren Assertion sichergestellt. Ein guter Softwaretest kombiniert in dieser Weise alle möglichen (zu diesem Zeitpunkt bekannten) Nutzungsweisen einer Programmkomponente und erlaubt damit jederzeit die Prüfung auf die gewünschte Funktionsweise.
Als Unittest wird eine Testmethode bezeichnet, die eine einzelne Komponente des Quellcodes ohne Abhängigkeiten zu Drittsystemen prüft. Ein Integrationstest prüft die Funktionsweise einer Softwarekomponente im Zusammenspiel mit eben diesem Drittsystem. Ein Systemtest ist schließlich die Prüfung der Funktion in einer Systemumgebung, die dem produktiven Betriebskontext annähernd identisch ist. Über pytest Marker können Testroutinen in verschiedene Gruppen eingeordnet und so gezielt im Rahmen einer passenden CI-Phase ausgeführt werden. Integrations- oder Systemtests laufen nur dann ab, wenn die nötigten Drittsysteme zur Verfügung stehen.
Codebeispiel: Airflow Connection als Mock simulieren.
pytest ermöglicht es, Interaktion mit Drittsystemen zu simulieren, indem ein Mock eine ausgewählte Methode ersetzt und einen beliebig definierbaren Rückgabewert liefert.
In diesem Beispiel erzeugen wir eine Airflow Connection (Zeile 159), ohne tatsächlich mit einem Airflow-System zu kommunizieren.
Der statisch definierte Rückgabewert wird einer ausgewählten Methode zugewiesen (Z. 168), welche im folgenden Programmablauf immer genau diesen Wert liefert.
Die Herausforderung bei Airflow Komponenten wie Operatoren und DAGs ist, dass diese sehr häufig auf die Interaktion mit Drittsystemen, mindestens aber mit einer Airflow Systemumgebung angewiesen sind. Unittests lassen sich daher fast nur sinnvoll für einzelne Operatoren definieren und auch hier primär für die Validierung von Eingabewerten bzw. deren korrekter Kombination. pytest unterstützt dabei mit eingebauten Hilfsmitteln, um Tests zu parametrieren sowie flüchtige Vorbedingungen für den Funktionstest zur Laufzeit zu schaffen. Die Interaktion mit Drittsystemen kann bei Bedarf mit sogenannten “Mocks” simuliert werden. Ein Mock ersetzt zur Laufzeit eine ausgewählte Methode und liefert einen beliebig definierbaren Rückgabewert. Auf diese Weise kann die Antwort einer REST API vorgetäuscht werden, ohne dass eine tatsächliche Kommunikation stattfindet. Obwohl Mocks ein sehr nützliches Werkzeug sind, wäre es enorm aufwändig, ganze Integrationstests mit Mocks zu simulieren.
Unittest Codebeispiel: Tests mit pytest parametrieren.
Über den “parametrize” Decorator kann ein Test mit einer Sammlung von verschiedenen Eingabeparametern zur Laufzeit mehrfach ausgeführt werden.
In diesem Beispiel werden die Parameter “operation”, “source_filepath”, usw. (Zeile 214) in verschiedenen Kombinationen in einer Variablen definiert (“parameter_combinations_failing”, Z. 215; Zuweisung der Werte nicht im Bild). Der eigentlich Test (Z. 217) wird dann für jede gegebene Kombination der Parameter einzeln ausgeführt:
In diesem Beispiel muss jede der angegebenen Kombinationen zu einem ebenfalls vordefinierten Fehlverhalten führen (Z. 225).
Um einen deutlichen Mehrwert im Betriebsalltag von Apache Airflow zu erreichen, werden Integrationstests benötigt, sei es für eigens entwickelte Plugins oder DAGs. Ein praktisches Hilfsmittel, um schon im Entwicklungsprozess gegen Drittsysteme testen zu können, sind die sogenannten “services” von CI/CD Frameworks. Diese erlauben die Definition von Systemen auf Basis von Docker Images, welche ausschließlich zur Laufzeit der CI/CD Pipeline gestartet und im Anschluss wieder entfernt werden. Eine vollständige Reproduktion aller Drittsysteme im Kontext der Produktivumgebung ist somit nicht dauerhaft notwendig. Services auf Basis von Docker Images lassen sich mit vielen CI/CD Frameworks definieren - wir erläutern hier die Funktionsweise am Beispiel von GitLab CI/CD.
Ein Service wird im Pipeline Manifest des Projekts definiert und vor der Ausführung eines CI Jobs im Rahmen einer Pipeline als Docker Container erstellt und gestartet. Die Konfiguration des Service erfolgt ausschließlich über die Angabe des benötigten Docker Image, des Startskripts (Entrypoint) sowie der Umgebungsvariablen. Mit der nötigen Expertise aus der Welt der Containervirtualisierung lassen sich auf diese Weise beliebige Drittsysteme simulieren, sei es eine Postgres Datenbank, ein SMB Fileserver oder eine spezifische REST API.
Sind die benötigten Drittsysteme auf diese Weise für die Testumgebung bereitgestellt, können die zugehörigen pytest-Module über Umgebungsvariablen passend konfiguriert und mittels der gesetzten Integrationstest-Marker gestartet werden. Unser Codebeispiel gibt einen Eindruck, wie dies aussehen kann.
Codebeispiel: Drittsystem als CI-Service starten und Integrationstests mit pytest ausführen.
Zunächst werden für die Testmodule notwendigen Umgebungsvariablen gesetzt (Zeilen 96-101) und ein SFTP Server als Service zur Laufzeit des CI-Jobs erzeugt
und gestartet (Zeilen 102-107). Mit pytest werden anschließend gezielt diejenigen Testmodule ausgeführt, welche den Marker “integration” zugewiesen bekommen haben (Z. 108).
Die Ergebnisse der Testläufe werden als JUnit Report an die GitLab Plattform zurückgespielt und können dadurch im User Interface komfortabel eingesehen werden (Zeilen 111-114).
Sollen nicht nur einzelne Operatoren oder Tasks geprüft werden, sondern ganze DAGs, führt realistisch kein Weg an einer vollwertigen Testumgebung vorbei. Im Einzelfall können auch Services zur Laufzeit für diese Zwecke nützlich sein, erfordern dann aber wiederum hohen Aufwand in der Konfiguration und Vorbereitung von benötigten Systemzuständen (z. B. für den Import von Testdaten in Datenbanksysteme).
Ist die nötige Testumgebung mit Drittsystemen und sämtlichen benötigten Connections in Airflow einmal vorbereitet, lassen sich vollständige DAGs über das Command Line Interface (CLI) von Airflow einfach testen. Das Kommando airflow dags test benötigt als Parameter lediglich die DAG-ID sowie das gewünschte Ausführungsdatum, wie im folgenden Codebeispiel dargestellt.
Codebeispiel: Systemtest für einen DAG ausführen.
In diesem Szenario soll ein DAG getestet werden, der auf eine Connection zu einem SAP Business Warehouse Testsystem angewiesen ist.
Die Airflow-Instanz wird zur Laufzeit des CI-Jobs gestartet (Zeile 54), sodass die Connection per CLI Befehl erzeugt werden muss (Z. 58).
Anschließend lässt sich der DAG gezielt als Testlauf ausführen (Z. 65) und mittels einfacher Linux-Kommandos auf Erfolg prüfen (Zeilen 65-74).
Sämtliche Connections für vollständige Systemtests direkt in der Airflow CI/CD Pipeline zu definieren und den Test für alle DAGs aufzurufen, kann schnell zu sehr unübersichtlichen Manifest-Dateien führen. Für komplexere Szenarien bietet es sich daher an, Wrapper für die Systemtests mit pytest zu definieren und beispielsweise das Airflow CLI aus Python heraus anzusteuern. Dies ermöglicht nicht nur komplexe Auswahl- und Steuerungslogiken für die Systemtests selbst, sondern auch eine tiefergehende Prüfung des Erfolgs über pytest Assertions. Mit einem Framework für Datenvalidierung wie Great Expectations kann dem Testprozess eine zusätzliche Dimension der Qualitätskontrolle hinzugefügt werden. Dem Detailgrad und der Kreativität beim Design von Teststrecken sind zumindest technisch keine Grenzen gesetzt.
Apache Airflow und die darin genutzten Komponenten mit einer automatisierten Testpipeline im Entwicklungsprozess abzusichern, ist ein hochgradig sinnvoller Ansatz für langfristige Stabilität. Die Möglichkeiten zur Umsetzung sind dabei gleichermaßen mächtig und flexibel, aber auch von hoher Komplexität und mit einer steilen Lernkurve für Entwickler und das Team verbunden.
Wir haben einige Schlaglichter auf die Möglichkeiten geworfen, Airflow mit pytest und GitLab CI/CD im Entwicklungsprozess zu testen. Finden Sie die Anforderungen Ihres Teams in diesem Artikel wieder oder würden Sie Ihre bestehenden Testprozess gerne erweitern? Wir freuen uns, Sie bei der Optimierung Ihrer Systeme und Prozesse rund um Apache Airflow zu unterstützen!