Skip to content
NextLytics
Megamenü_2023_Über-uns

Shaping Business Intelligence

Whether clever add-on products for SAP BI, development of meaningful dashboards or implementation of AI-based applications - we shape the future of Business Intelligence together with you. 

Megamenü_2023_Über-uns_1

About us

As a partner with deep process know-how, knowledge of the latest SAP technologies as well as high social competence and many years of project experience, we shape the future of Business Intelligence in your company too.

Megamenü_2023_Methodik

Our Methodology

The mixture of classic waterfall model and agile methodology guarantees our projects a high level of efficiency and satisfaction on both sides. Learn more about our project approach.

Products
Megamenü_2023_NextTables

NextTables

Edit data in SAP BW out of the box: NextTables makes editing tables easier, faster and more intuitive, whether you use SAP BW on HANA, SAP S/4HANA or SAP BW 4/HANA.

Megamenü_2023_Connector

NextLytics Connectors

The increasing automation of processes requires the connectivity of IT systems. NextLytics Connectors allow you to connect your SAP ecosystem with various open-source technologies.

IT-Services
Megamenü_2023_Data-Science

Data Science & Engineering

Ready for the future? As a strong partner, we will support you in the design, implementation and optimization of your AI application.

Megamenü_2023_Planning

SAP Planning

We design new planning applications using SAP BPC Embedded, IP or SAC Planning which create added value for your company.

Megamenü_2023_Dashboarding

Dashboarding

We help you with our expertise to create meaningful dashboards based on Tableau, Power BI, SAP Analytics Cloud or SAP Lumira. 

Megamenü_2023_Data-Warehouse-1

SAP Data Warehouse

Are you planning a migration to SAP HANA? We show you the challenges and which advantages a migration provides.

Business Analytics
Megamenü_2023_Procurement

Procurement Analytics

Transparent and valid figures are important, especially in companies with a decentralized structure. SAP Procurement Analytics allows you to evaluate SAP ERP data in SAP BI.

Megamenü_2023_Reporting

SAP HR Reporting & Analytics

With our standard model for reporting from SAP HCM with SAP BW, you accelerate business activities and make data from various systems available centrally and validly.

Megamenü_2023_Dataquality

Data Quality Management

In times of Big Data and IoT, maintaining high data quality is of the utmost importance. With our Data Quality Management (DQM) solution, you always keep the overview.

Career
Megamenü_2023_Karriere-2b

Working at NextLytics

If you would like to work with pleasure and don't want to miss out on your professional and personal development, we are the right choice for you!

Megamenü_2023_Karriere-1

Senior

Time for a change? Take your next professional step and work with us to shape innovation and growth in an exciting business environment!

Megamenü_2023_Karriere-5

Junior

Enough of grey theory - time to get to know the colourful reality! Start your working life with us and enjoy your work with interesting projects.

Megamenü_2023_Karriere-4-1

Students

You don't just want to study theory, but also want to experience it in practice? Check out theory and practice with us and experience where the differences are made.

Megamenü_2023_Karriere-3

Jobs

You can find all open vacancies here. Look around and submit your application - we look forward to it! If there is no matching position, please send us your unsolicited application.

Blog
NextLytics Newsletter Teaser
Sign up now for our monthly newsletter!
Sign up for newsletter
 

Data-Aware Scheduling with Apache Airflow Datasets

It is well established by now that Airflow is the state-of-the-art tool for scheduling and monitoring workflows in a unified framework. Airflow enables us to define complex dependencies among various tasks inside DAGs (directed acyclic graphs), although up until recently the execution of DAGs was only possible in a time-based manner (much like cron). Data-aware scheduling has been newly introduced to Airflow with version 2.4 and enables developers to define a producer-consumer relation between DAGs. A DAG can now be triggered by a task that operates on a dataset whenever that dataset is updated, independently of a predefined time schedule. Imagine for example, you have a task that imports currency exchange rates into a database table, and another DAG which performs a transformation should be triggered immediately. We will go through this scenario in greater detail and provide some actual code examples.

Benefits of data-aware scheduling

Scheduling a workflow in a time-based manner works well if you know beforehand when each process should run. This can cause problems if there is a need to define dependency between DAGs. Consider the following scenario: Data engineering team A runs a preprocessing pipeline that creates/updates a dataset periodically (it could be a plain text file, a database entry, or any kind of resource). Data analysis team B runs a pipeline that reads the dataset and performs some analysis and calculations. Team B would like to be notified as soon as possible for the availability of the new dataset in order to minimize the downtime between two pipeline runs. This can be achieved through the use of a data-aware scheduling mechanism where the consumer is notified when a change is published by the producer and its pipeline execution starts immediately. This results in faster and more robust cross-pipeline communication. Stakeholders that need just-in-time data in their reports and dashboards will always get the latest data as soon as possible.

airflow-dag-dataset-scheduled

“update_eur_exchange_rate_database_table” DAG is scheduled based on a dataset update.

What is an Airflow dataset

An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream “producer” tasks, and dataset updates contribute to scheduling downstream “consumer” DAGs. This means that a DAG will be executed if and only if a dataset that is dependent upon that, receives an update. Airflow makes no assumptions about the content or location of the dataset, which means that it can represent a simple text file saved locally, a database table, an Azure storage blob, etc. The dataset is identified by a URI (Uniform Resource Identifier).Two Dataset objects that share the same URI, will be considered to refer to the same resource. If a DAG depends upon multiple datasets, the execution starts when every dataset has received at least one update. Note that the dataset is considered to be updated if the task that is responsible for the update, finishes successfully. There is no way for Airflow to know if the dataset is actually updated, so the developer is responsible for making sure that the update is complete. In other words, no polling is taking place to ensure that the resource received an update.

Practical examples

airflow-dags-dataset-schedulingpng

“update_eur_exchange_rate_database_table” DAG’s next run is determined by the updates on a specific resource.

To showcase the usage of the new Datasets feature, a simple example is presented. The “import_ecb_eur_exchange_rate_into_postgres” DAG is responsible for fetching the latest exchange rates for Euro and inserting them into a Postgres database and “update_eur_exchange_rate_database_table” DAG is scheduled to run when an update is available.

First, the “import_ecb_eur_exchange_rate_into_postgres” or producer DAG is defined:

import io

import pandas as pd

import pendulum

import requests

import sqlalchemy

from airflow import Dataset

from airflow.decorators import task, dag

from airflow.hooks.base import BaseHook

db_conn = BaseHook.get_connection("pg_default")

engine = sqlalchemy.create_engine(

        f"postgresql+psycopg2://{db_conn.login}:{db_conn.password}@{db_conn.host}:{db_conn.port}/{db_conn.schema}")

@dag(

            schedule_interval="@daily",

        start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),

        catchup=True,

)

def import_ecb_eur_exchange_rate_into_postgres():

@task

        def get_ecb_exchange_rate_data(**kwargs) -> str:

                    base_url = "https://sdw-wsrest.ecb.europa.eu/service/data"

                    flow_ref = "EXR"

                    key = "D..EUR.SP00.A"

                    logical_date = kwargs["ds"]

                    parameters = {

                                "startPeriod": logical_date,

                                "endPeriod": logical_date,

                                "format": "csvdata",

                    }

                    request_url = f"{base_url}/{flow_ref}/{key}"

                    response = requests.get(request_url, params=parameters)

                    return pd.read_csv(io.StringIO(response.text)).to_json()

        

@task(outlets=[Dataset("postgres://dse.nextlytics.com/demo/eur_exchange_rates")])

        def import_exchange_rate_data_into_postgres(json_data: str, table: str):

                    loadtime = pendulum.now().isoformat()

                    df_source = pd.read_json(json_data)

                    df_db = pd.DataFrame().assign(

                        exchange_rate_date=df_source["TIME_PERIOD"],

                        currency=df_source["CURRENCY"],

                        currency_name=df_source["TITLE"].str.replace("/Euro", ""),

                        rate=df_source["OBS_VALUE"],

                        _loadtime=loadtime)

                    with engine.connect() as con:

                                df_db.to_sql(name=table, con=con, if_exists="append", index=False)

        json_data = get_ecb_exchange_rate_data()

        import_exchange_rate_data_into_postgres(json_data, table='eur_exchange_rates')

dag = import_ecb_eur_exchange_rate_into_postgres()

The “import_exchange_rate_data_into_postgres” task defines a list of Dataset objects as “outlets”, i.e. datasets produced or updated by this task. In this case, the task inserts some new records in a database table.


Effective workflow management with Apache Airflow 2.0

NextLytics Whitepaper Apache Airflow


The “update_eur_exchange_rate_database_table” or consumer DAG has the following definition and its job is to insert only the latest exchange rate data into a separate table as soon as a new batch of data is available:

import pendulum

import sqlalchemy

from airflow import Dataset

from airflow.decorators import task, dag

from airflow.hooks.base import BaseHook


db_conn = BaseHook.get_connection("pg_default")

engine = sqlalchemy.create_engine(

f"postgresql+psycopg2://{db_conn.login}:{db_conn.password}@{db_conn.host}:{db_conn.port}/{db_conn.schema}")


@dag(

schedule=[Dataset("postgres://dse.nextlytics.com/demo/eur_exchange_rates")],

start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),

catchup=False,

)

def update_eur_exchange_rate_database_table():

@task

def update_table(**kwargs):

     with engine.begin() as con:

         backup_table = "ALTER TABLE IF EXISTS report_current_eur_exchange_rates RENAME TO report_current_eur_exchange_rates_bak"

         con.execute(backup_table)


         update_table = """

             select

                 distinct eer.currency,

                 eer.currency_name,

                 (select rate from eur_exchange_rates eer2 where eer2.currency=eer.currency

                     order by eer2.exchange_rate_date desc limit 1),

                 now() as update_timestamp

                 into report_current_eur_exchange_rates

                 from eur_exchange_rates eer order by currency asc;"""


        con.execute(update_table)


         remove_backup = "drop table if exists report_current_eur_exchange_rates_bak"

         con.execute(remove_backup)


update_table()


dag = update_eur_exchange_rate_database_table()

The “consumer” DAG is scheduled based on a Dataset, which in this case shares the URI defined by our exemplary producer. Notice that the “Dataset” objects used for defining the dependencies are different, but they refer to the same resource. The DAG is triggered the moment the “import_exchange_rate_data_into_postgres” task from the producer succeeds, resulting in zero downtime between the two DAG executions.

airflow-dataset-lineage

Visual representation of the lineage between the DAGS. Notice the dataset resource in the middle.

This new feature allows for more flexibility when scheduling DAGs that are interrelated, but there are some pitfalls to avoid. It is possible to create a task that is supposed to update a resource, but there is no interaction with the resource at all. Airflow allows that because it's at the developer's discretion to decide what constitutes an update. Airflow deems a dataset updated if the task that is assigned to update it, is executed successfully regardless of the implementation of the task. The Airflow Dataset API preserves the design choice to separate workflow process logic from the processed data and at the same time enables tighter coupling with less custom code.

Years of discussion and development have finally resulted in the new Airflow Dataset concept described above. The current implementation is a very high-level wrapper around actual data artifacts that provides little functionality beyond cross-DAG-scheduling. Notably, there is no REST API endpoint to mark a dataset as externally updated yet, which would open up interesting cross-system integration scenarios. The possibility of just-in-time scheduling of DAGs is still a highly valuable addition to the ever-increasing Apache Airflow toolset and provides data engineers yet another possibility to meet business requirements.

Do you have further questions about Apache Airflow? You are welcome to schedule an initial discussion with our NextLytics Data Science and Engineering team. We’re curious to learn more about your use cases and support you in finding solutions that fit your specific needs and system context.

Learn more about Apache Airflow

avatar

Apostolos

Apostolos has been a Data Engineering Consultant for NextLytics AG since 2022. He holds experience in research projects regarding deep learning methodologies and their applications in Fintech, as well as background in backend development. In his spare time he enjoys playing the guitar and stay up to date with the latest news on technology and economics.

Got a question about this blog?
Ask Apostolos

Blog - NextLytics AG 

Welcome to our blog. In this section we regularly report on news and background information on topics such as SAP Business Intelligence (BI), SAP Dashboarding with Lumira Designer or SAP Analytics Cloud, Machine Learning with SAP BW, Data Science and Planning with SAP Business Planning and Consolidation (BPC), SAP Integrated Planning (IP) and SAC Planning and much more.

Subscribe to our newsletter

Related Posts

Recent Posts