Advanced Apache Airflow: Mastering Data Sources, Scheduling & Workflow Triggers (Part 2)

July 17, 2023

Zeeshan Ahmad

Technical Architect

Advanced Apache Airflow - Mastering Data Sources, Scheduling & Workflow Triggers

 

Part 2 is about more advanced topics related to Apache Airflow, the powerful tool that is empowering Data Engineers and Scientists for Efficient Workflow Management. If you’re just joining us and haven’t read Part 1 yet, we recommend checking it out [ Apache Airflow: Efficient Workflow Management for Data Engineers & Scientists (PART 1) ] for a comprehensive understanding. 

In the previous part, we started our exploration into the world of Airflow, navigating its initial setup, basic concepts, and the process of defining and running simple workflows. And in Part 2, you will get details related to more advanced topics, including-  

  • How to manage data sources and destinations in Airflow 
  • Unraveling the secrets of scheduling and triggering workflows, and much more. 

So, let’s get started and here explore various tips and best practices for operating Airflow to its fullest potential aimed to help you become a proficient user of this powerful data orchestration tool.  

Managing Data Sources And Destinations

Prepare to embark on a thrilling expedition through the labyrinth of data sources and destinations in the realm of Apache Airflow. In this extraordinary adventure, we shall unravel the secrets of working with connections and hooks, seamlessly integrate with diverse data sources, and conquer the art of writing data to a myriad of destinations. Brace yourselves, for a world of data awaits!

Working With Connections And Hooks In Airflow

In the mystical land of Airflow, connections and hooks form the gateway to a realm of boundless possibilities. Connections allow us to establish communication channels with external systems, while hooks act as bridges, facilitating interaction with those systems. 

Windows wizards, behold these commands in your command prompt as they guide you through the journey:

# Activate the virtual environment (if applicable) 

.\airflow-env\Scripts\activate 

# Navigate to the directory where your DAGs reside 

cd airflow\dags

And for the Linux sorcerers, embrace the power of these commands in your terminal: 

# Activate the virtual environment (if applicable) 

source airflow-env/bin/activate 

# Navigate to the directory where your DAGs reside 

cd airflow/dags 

With the Airflow CLI as your wand, let us explore an example that showcases the mystical powers of connections and hooks: 

from airflow import DAG 

from airflow.providers.postgres.hooks.postgres import PostgresHook 

from airflow.providers.postgres.operators.postgres import PostgresOperator 

from datetime import datetime 

 

# Create a new DAG 

dag = DAG( 

    "data_migration", 

    description="A daring data migration adventure", 

    start_date=datetime(2023, 1, 1), 

    schedule_interval=None, 

) 

# Establish a connection to the source database 

source_hook = PostgresHook(postgres_conn_id="source_db") 

 

# Establish a connection to the destination database 

destination_hook = PostgresHook(postgres_conn_id="destination_db") 

# Define the tasks for data migration 

extract_data = PostgresOperator( 

    task_id="extract_data", 

    sql="SELECT * FROM source_table", 

    postgres_conn_id="source_db", 

    dag=dag, 

) 

transform_data = PythonOperator( 

    task_id="transform_data", 

    python_callable=transform_function, 

    dag=dag, 

) 

load_data = PostgresOperator( 

    task_id="load_data", 

    sql="INSERT INTO destination_table SELECT * FROM source_table", 

    postgres_conn_id="destination_db", 

    dag=dag, 

) 

# Set the task dependencies 

extract_data >> transform_data >> load_data 

Integrating with different data sources: Databases, APIs, Cloud Services, etc.

Within the realm of data integration, Airflow empowers us to seamlessly connect with a multitude of data sources. Whether it be traditional databases, APIs, or cloud services, the power of integration lies at our fingertips. 

Let us embark on a grand quest where we integrate diverse data sources to weave a tapestry of knowledge. Behold the code snippets that shall guide us: 

1. Connecting to a PostgreSQL database: 

# Windows 

airflow connections add --conn-type postgres --conn-login myuser --conn-password mypassword --conn-host localhost --conn-port 5432 --conn-schema public source_db 

# Linux 

airflow connections add --conn-type postgres --conn-login myuser --conn-password mypassword --conn-host localhost --conn-port 5432 --conn-schema public source_db 

2. Connecting to an API: 

# Windows 

airflow connections add --conn-type http --conn-host api.twitter.com --conn-login my_api_key --conn-password my_api_secret --conn-extra '{"headers": {"Content-Type": "application/json"}}' my_api_connection 

# Linux 

airflow connections add --conn-type http --conn-host api.twitter.com --conn-login my_api_key --conn-password my_api_secret --conn-extra '{"headers": {"Content-Type": "application/json"}}' my_api_connection

Writing Data To Various Destinations: Databases, Data Lakes, Cloud Storage, etc.

In the magical realm of Airflow, we possess the power to write data to a multitude of destinations, ranging from traditional databases to modern data lakes and cloud storage services. This enables us to unlock the true potential of our workflows and unleash the data-driven wizardry within. 

Prepare yourselves, for the commands and code snippets below, shall guide you on your path: 

1. Writing data to a PostgreSQL database: 

from airflow import DAG 

from airflow.operators.postgres_operator import PostgresOperator 

from datetime import datetime 

 

dag = DAG( 

    "write_to_postgres", 

    description="An awe-inspiring journey to write data to a PostgreSQL database", 

    start_date=datetime(2023, 1, 1), 

    schedule_interval=None, 

) 

write_to_postgres = PostgresOperator( 

    task_id="write_to_postgres", 

    sql="INSERT INTO destination_table SELECT * FROM source_table", 

    postgres_conn_id="my_destination_db", 

    dag=dag, 

) 

2. Writing data to a data lake (e.g., Amazon S3): 

from airflow import DAG 

from airflow.contrib.operators.aws_s3_operator import S3CopyObjectOperator 

from datetime import datetime 

 

dag = DAG( 

    "write_to_data_lake", 

    description="A captivating voyage to write data to a data lake", 

    start_date=datetime(2023, 1, 1), 

    schedule_interval=None, 

) 

write_to_data_lake = S3CopyObjectOperator( 

    task_id="write_to_data_lake", 

    source_bucket_key="source_bucket/source_file.csv", 

    dest_bucket_key="destination_bucket/destination_file.csv", 

    aws_conn_id="my_aws_connection", 

    dag=dag, 

)

Prepare to wield the power of Airflow and unleash the magic of managing data sources and destinations like never before. 

Scheduling and Triggering Workflows 

Prepare to unlock the secrets of time manipulation and unleash the power of workflow scheduling and triggering in the realm of Apache Airflow. In this exhilarating chapter, we shall delve into the art of configuring time-based and interval-based schedules, harness the ability to manually trigger DAG runs, and wield the Airflow API to unleash the magic programmatically. Brace yourselves, for a realm of precise orchestration awaits!

1. Configuring Time-Based And Interval-Based Schedules Using Cron Expressions And Relative Time Expressions

Within the realm of Airflow’s scheduling magic, we possess the power to mold time itself. With the ancient wisdom of cron expressions and the dynamic nature of relative time expressions, we can shape our workflows to follow intricate schedules. 

Windows wizards, heed these commands within your command prompt as they guide you through the labyrinth of time: 

# Activate the virtual environment (if applicable) 

.\airflow-env\Scripts\activate 

# Navigate to the directory where your DAGs reside 

cd airflow\dags 

And for the Linux sorcerers, embrace the power of these commands in your terminal: 

# Activate the virtual environment (if applicable) 

source airflow-env/bin/activate 

# Navigate to the directory where your DAGs reside 

cd airflow/dags 

Now, let us unravel the incantations that enable us to configure schedules within our DAGs: 

2. Configuring time-based schedules using cron expressions: 

from airflow import DAG 

from datetime import datetime 

from airflow.operators.bash import BashOperator 

 

dag = DAG( 

    "my_time_based_dag", 

    description="A marvelous time-based DAG", 

    schedule_interval="0 0 * * *",  # Execute at midnight (00:00) every day 

    start_date=datetime(2023, 1, 1), 

) 

 

task_1 = BashOperator( 

    task_id="task_1", 

    bash_command="echo 'Task 1 is complete'", 

    dag=dag, 

) 

 

task_2 = BashOperator( 

    task_id="task_2", 

    bash_command="echo 'Task 2 is complete'", 

    dag=dag, 

) 

 

task_1 >> task_2 

3. Configuring interval-based schedules using relative time expressions: 

from airflow import DAG 

from datetime import datetime, timedelta 

from airflow.operators.bash import BashOperator 

 

dag = DAG( 

    "my_interval_based_dag", 

    description="An extraordinary interval-based DAG", 

    schedule_interval=timedelta(hours=2),  # Execute every 2 hours 

    start_date=datetime(2023, 1, 1), 

) 

task_1 = BashOperator( 

    task_id="task_1", 

    bash_command="echo 'Task 1 is complete'", 

    dag=dag, 

) 

task_2 = BashOperator( 

    task_id="task_2", 

    bash_command="echo 'Task 2 is complete'", 

    dag=dag, 

) 

task_1 >> task_2 

Manual Triggering Of DAG Runs

In the realm of Airflow, we possess the ability to command the execution of DAG runs at our will. This allows us to trigger workflows manually, granting us ultimate control over the magical orchestration.  

Prepare yourselves, for the commands below, shall guide you on your journey: 

For Windows: 

# Activate the virtual environment (if applicable) 

.\airflow-env\Scripts\activate 

# Trigger a DAG run 

airflow dags trigger my_dag_id 

For Linux: 

# Activate the virtual environment (if applicable) 

source airflow-env/bin/activate 

# Trigger a DAG run 

airflow dags trigger my_dag_id

Programmatically triggering DAG runs using the Airflow API

In the realm of Apache Airflow, we possess the ability to command the execution of DAG runs programmatically, opening up a world of possibilities. With the Airflow API at our disposal, we can trigger workflows with a single stroke of code, empowering us to automate and integrate our data pipelines seamlessly. 

Windows wizards heed these commands within your command prompt as they summon the power of the Airflow API: 

# Activate the virtual environment (if applicable) 

.\airflow-env\Scripts\activate 

# Trigger a DAG run using the Airflow REST API 

curl -X POST -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" http://localhost:8080/api/v1/dags/my_dag_id/dagRuns 

And for the Linux sorcerers, embrace the power of these commands in your terminal: 

# Activate the virtual environment (if applicable) 

source airflow-env/bin/activate 

# Trigger a DAG run using the Airflow REST API 

curl -X POST -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" http://localhost:8080/api/v1/dags/my_dag_id/dagRuns 

Now, let us dive into the captivating world of programmatically triggering DAG runs using the Airflow API: 

import requests 

 

# Define the necessary details for the Airflow API 

base_url = "http://localhost:8080/api/v1" 

api_token = "<YOUR_ACCESS_TOKEN>" 

dag_id = "my_dag_id" 

 

# Define the API endpoint to trigger a DAG run 

endpoint = f"{base_url}/dags/{dag_id}/dagRuns" 

 

# Create the authorization header 

headers = {"Authorization": f"Bearer {api_token}"} 

 

# Trigger the DAG run 

response = requests.post(endpoint, headers=headers) 

 

if response.status_code == 200: 

    print("DAG run triggered successfully!") 

else: 

    print("Failed to trigger DAG run.") 

With these incantations, you can seamlessly integrate your data workflows with other systems, trigger DAG runs on demand, and unlock the true power of automation. Brace yourselves, for the realm of programmatically-triggered DAG runs awaits your command!  

Leave a Reply

Your email address will not be published. Required fields are marked *