Apache Airflow is the de-facto standard for workflow management platforms under a free license and is the glue that holds the data transformation and process chains of many companies together. While workflow development in Airflow requires some Python programming skills, this also makes it quick to learn and benefits from the high maturity of tools and processes that have been practiced in software development for decades. But where can your team start when applying automated software testing and Continuous Integration/Continuous Deployment (CI/CD) principles to a new Airflow system environment? We have collected some practical tips that can be helpful when starting or expanding your Airflow development processes.
More robust workflows through automated testing
Version control, code reviews, short development cycles, automated testing. These standard tools are taught in every programming course and training. They are tools from the software engineering handbook that are intended to ensure high quality in the developed software product and contribute to continuous improvement. In particular early automated testing of the source code makes a large contribution to obtain robust and high-quality results. Since workflows or DAGs (directed acyclic graphs) are also defined in code with Apache Airflow, it is obvious that these should also be tested as extensively as possible. If Airflow is used specifically for data pipelines, i.e. workflows that control the data flow between different systems, these must function particularly reliably and verifiably correctly.
What options are there for extensively testing DAGs before they are executed in the production environment? Which approaches bring the fastest added value with a low use of resources? We will use the example of GitLab CI/CD as a framework for test automation to highlight some approaches.
Airflow, pytest and GitLab CI/CD
For our example, we assume that Airflow is operated according to DevOps principles and GitLab CI/CD is used as an automation framework for these system operation processes. We have already presented some of our recommendations for this in a previous article on Airflow CI/CD pipelines in GitLab. The basic idea is to manage all parameters for system operation in a version control system (GitLab) and to automatically update the operating environment when relevant changes occur (CI/CD).
Scheme for the development of Airflow DAGs using a CI/CD pipeline from our blog.
Only when all test stages have been successfully completed can a DAG be transferred to the production environment.
Within the scope of this process automation, tests are to be integrated that serve as predetermined breaking points and prevent the commissioning of faulty states. Such a CI/CD pipeline should not only secure the operating parameters of the airflow platform, but also the business logic embedded in DAGs.
A DAG in Airflow defines a sequence of initially independent tasks, which must not contain cyclic elements. Each task has exactly one or more successors. The Airflow project documentation gives some hints, how DAGs can be tested. In terms of business logic, it boils down to testing each task individually. The pytest library is usually used as a testing framework for Python source code.
pytest allows the definition of software tests that execute a part of the program logic and check the success of the execution via freely selectable target dimensions (assertions). If a function must always return the same result for known input values, this can be checked using an assertion. If the function is to raise a specific error message for invalid input values, this is ensured with another assertion. A good software test combines in this way all possible modes of use of a program component (known at this time) and ensures the operation within acceptable parameters at all times.
The right test at the right time: unit, integration, system tests
A unit test is a test method that examines a single component of the source code without dependencies on third-party systems. An integration test checks the functionality of a software component in interaction with third-party systems. Finally, a system test is the examination of the functionality in a system environment that is approximately identical to the productive operating context. Using pytest markers, test routines can be assigned to different groups and thus executed specifically in the context of a suitable CI phase. Integration or system tests only run when the necessary third-party systems are available.
Code example: Simulate Airflow Connection as a mock.
pytest allows you to simulate interaction with third-party systems by using a mock to replace a selected method and return an arbitrary return value.
In this example, we create an Airflow Connection (line 159) without actually communicating with an Airflow system.
The statically defined return value is assigned to a selected method (line 168), which always returns exactly this value in the following program flow.
The challenge with Airflow components such as operators and DAGs is that they very often rely on interaction with third-party systems, or at least with an Airflow system environment. Therefore, unit tests can almost only be meaningfully defined for individual operators, and even here primarily for the validation of input values or their correct combination. pytest supports this with built-in tools for parameterizing tests and creating deterministic preconditions for the functional test at runtime. Interaction with third-party systems can be simulated with so-called "mocks" if required. A mock replaces a selected method at runtime and returns an arbitrarily definable return value. In this way, the response of a REST API can be mocked without any actual communication taking place, for example. Although mocks are a very useful tool, it would be enormously time-consuming to simulate entire integration tests with mocks.
Unit test code example: parameterize tests with pytest.
Using the "parametrize" decorator, a test with a collection of different input parameters can be executed multiple times at runtime.
In this example, the parameters "operation", "source_filepath", etc. (line 214) are defined in different combinations in a variable
("parameter_combinations_failing", line 215; assignment of values not shown in the image).
The actual test (line 217) is then executed separately for each given combination of the parameters:
In this example, each of the given combinations must lead to a likewise predefined failure (line 225).
Optimize your workflow management
with Apache Airflow!
Testing integration with pytest and GitLab services
In order to achieve a significant added value in the everyday operation of Apache Airflow, integration tests are required, whether for specially developed plugins or DAGs. A practical tool to test against third-party systems during development are the so-called "services" of CI/CD frameworks. These enable the definition of systems based on Docker images, which are started exclusively at runtime of the CI/CD pipeline and removed again afterwards. A complete reproduction of all third-party systems in the context of the productive environment is therefore not permanently necessary. Services based on Docker images can be defined with many CI/CD frameworks - we explain the functionality here using the example of GitLab CI/CD.
A service is defined in the project's pipeline manifest and is created and started as a Docker container in the context of a pipeline before a CI job is executed. The service is configured exclusively by specifying the required Docker image, the start script (entrypoint) and the environment variables. With the necessary expertise in container virtualization, any third-party systems can be simulated in this way, be it a Postgres database, an SMB file server or a specific REST API.
Once the required third-party systems have been made available for the test environment in this way, the associated pytest modules can be configured appropriately via environment variables and started using the integration test markers. Our code example gives an impression of how this can look.
Code example: Start a third-party system as CI service and execute integration tests.
First, the environment variables required for the test modules are set (lines 96-101) and an SFTP server is created and started as a service at the runtime of the CI job (lines 102-107). Subsequently, pytest is used to selectively execute those test modules that have been assigned the marker "integration" (line 108).
The results of the test runs are fed back to the GitLab platform as a JUnit report and can thus be conveniently viewed in the user interface (lines 111-114).
Test DAGs and validate data
If not only individual operators or tasks are to be tested, but entire DAGs, there is realistically no way around a full-fledged test environment. In individual cases, services at runtime can also be useful for these purposes, but require a great deal of effort in the configuration and preparation of required system states (e.g., for the import of test data into database systems).
Once the necessary test environment with third-party systems and all required connections in Airflow have been prepared, complete DAGs can be easily tested via the Command Line Interface (CLI) of Airflow. The airflow dags test command only requires the DAG ID and the desired execution date as parameters, as shown in the following code example.
Code example: Execute system test for a DAG.
In this scenario, a DAG is to be tested that depends on a connection to an SAP Business Warehouse test system.
The Airflow instance is started at runtime of the CI job (line 54), so the connection has to be created via CLI command (line 58).
Afterwards, the DAG can be executed specifically as a test run (line 65) and checked for success using simple Linux commands (lines 65-74).
Defining all connections for complete system tests directly in the Airflow CI/CD pipeline and running the test for all DAGs can quickly lead to overly complex manifest files. For more elaborate scenarios, it is therefore a good idea to define wrappers for the system tests with pytest. This not only enables complex selection and control logic for the system tests themselves, but also a more in-depth check of success via pytest assertions. With a data validation framework such as Great Expectations, an additional dimension of quality control can be added to the testing process. From a technical perspective, there are no limits to the level of detail and creativity in the design of software tests for Airflow components.
CI/CD Pipelines - Our Conclusion
Securing Apache Airflow and the components used in it with an automated test pipeline in the development process is a highly recommended approach for long-term stability. The options for implementation are equally powerful and flexible, as they can turn complex and are associated with a steep learning curve for developers and the team.
We have turned spotlights on some possibilities to test Airflow with pytest and GitLab CI/CD in the development process. Do you find your team's needs reflected in this article or would you like to extend your existing testing process? We look forward to helping you optimize your systems and processes around Apache Airflow!