Part 2 is about more advanced topics related to Apache Airflow, the powerful tool that is empowering Data Engineers and Scientists for Efficient Workflow Management. If you’re just joining us and haven’t read Part 1 yet, we recommend checking it out [ Apache Airflow: Efficient Workflow Management for Data Engineers & Scientists (PART 1) ] for a comprehensive understanding.
In the previous part, we started our exploration into the world of Airflow, navigating its initial setup, basic concepts, and the process of defining and running simple workflows. And in Part 2, you will get details related to more advanced topics, including-
- How to manage data sources and destinations in Airflow
- Unraveling the secrets of scheduling and triggering workflows, and much more.
So, let’s get started and here explore various tips and best practices for operating Airflow to its fullest potential aimed to help you become a proficient user of this powerful data orchestration tool.
Managing Data Sources And Destinations
Prepare to embark on a thrilling expedition through the labyrinth of data sources and destinations in the realm of Apache Airflow. In this extraordinary adventure, we shall unravel the secrets of working with connections and hooks, seamlessly integrate with diverse data sources, and conquer the art of writing data to a myriad of destinations. Brace yourselves, for a world of data awaits!
Working With Connections And Hooks In Airflow
In the mystical land of Airflow, connections and hooks form the gateway to a realm of boundless possibilities. Connections allow us to establish communication channels with external systems, while hooks act as bridges, facilitating interaction with those systems.
Windows wizards, behold these commands in your command prompt as they guide you through the journey:
# Activate the virtual environment (if applicable) .\airflow-env\Scripts\activate # Navigate to the directory where your DAGs reside cd airflow\dags
And for the Linux sorcerers, embrace the power of these commands in your terminal:
# Activate the virtual environment (if applicable) source airflow-env/bin/activate # Navigate to the directory where your DAGs reside cd airflow/dags
With the Airflow CLI as your wand, let us explore an example that showcases the mystical powers of connections and hooks:
from airflow import DAG from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime # Create a new DAG dag = DAG( "data_migration", description="A daring data migration adventure", start_date=datetime(2023, 1, 1), schedule_interval=None, ) # Establish a connection to the source database source_hook = PostgresHook(postgres_conn_id="source_db") # Establish a connection to the destination database destination_hook = PostgresHook(postgres_conn_id="destination_db") # Define the tasks for data migration extract_data = PostgresOperator( task_id="extract_data", sql="SELECT * FROM source_table", postgres_conn_id="source_db", dag=dag, ) transform_data = PythonOperator( task_id="transform_data", python_callable=transform_function, dag=dag, ) load_data = PostgresOperator( task_id="load_data", sql="INSERT INTO destination_table SELECT * FROM source_table", postgres_conn_id="destination_db", dag=dag, ) # Set the task dependencies extract_data >> transform_data >> load_data
Integrating with different data sources: Databases, APIs, Cloud Services, etc.
Within the realm of data integration, Airflow empowers us to seamlessly connect with a multitude of data sources. Whether it be traditional databases, APIs, or cloud services, the power of integration lies at our fingertips.
Let us embark on a grand quest where we integrate diverse data sources to weave a tapestry of knowledge. Behold the code snippets that shall guide us:
1. Connecting to a PostgreSQL database:
# Windows
airflow connections add --conn-type postgres --conn-login myuser --conn-password mypassword --conn-host localhost --conn-port 5432 --conn-schema public source_db
# Linux
airflow connections add --conn-type postgres --conn-login myuser --conn-password mypassword --conn-host localhost --conn-port 5432 --conn-schema public source_db
2. Connecting to an API:
# Windows
airflow connections add --conn-type http --conn-host api.twitter.com --conn-login my_api_key --conn-password my_api_secret --conn-extra '{"headers": {"Content-Type": "application/json"}}' my_api_connection
# Linux
airflow connections add --conn-type http --conn-host api.twitter.com --conn-login my_api_key --conn-password my_api_secret --conn-extra '{"headers": {"Content-Type": "application/json"}}' my_api_connection
Writing Data To Various Destinations: Databases, Data Lakes, Cloud Storage, etc.
In the magical realm of Airflow, we possess the power to write data to a multitude of destinations, ranging from traditional databases to modern data lakes and cloud storage services. This enables us to unlock the true potential of our workflows and unleash the data-driven wizardry within.
Prepare yourselves, for the commands and code snippets below, shall guide you on your path:
1. Writing data to a PostgreSQL database:
from airflow import DAG from airflow.operators.postgres_operator import PostgresOperator from datetime import datetime dag = DAG( "write_to_postgres", description="An awe-inspiring journey to write data to a PostgreSQL database", start_date=datetime(2023, 1, 1), schedule_interval=None, ) write_to_postgres = PostgresOperator( task_id="write_to_postgres", sql="INSERT INTO destination_table SELECT * FROM source_table", postgres_conn_id="my_destination_db", dag=dag, )
2. Writing data to a data lake (e.g., Amazon S3):
from airflow import DAG from airflow.contrib.operators.aws_s3_operator import S3CopyObjectOperator from datetime import datetime dag = DAG( "write_to_data_lake", description="A captivating voyage to write data to a data lake", start_date=datetime(2023, 1, 1), schedule_interval=None, ) write_to_data_lake = S3CopyObjectOperator( task_id="write_to_data_lake", source_bucket_key="source_bucket/source_file.csv", dest_bucket_key="destination_bucket/destination_file.csv", aws_conn_id="my_aws_connection", dag=dag, )
Prepare to wield the power of Airflow and unleash the magic of managing data sources and destinations like never before.
Scheduling and Triggering Workflows
Prepare to unlock the secrets of time manipulation and unleash the power of workflow scheduling and triggering in the realm of Apache Airflow. In this exhilarating chapter, we shall delve into the art of configuring time-based and interval-based schedules, harness the ability to manually trigger DAG runs, and wield the Airflow API to unleash the magic programmatically. Brace yourselves, for a realm of precise orchestration awaits!
1. Configuring Time-Based And Interval-Based Schedules Using Cron Expressions And Relative Time Expressions
Within the realm of Airflow’s scheduling magic, we possess the power to mold time itself. With the ancient wisdom of cron expressions and the dynamic nature of relative time expressions, we can shape our workflows to follow intricate schedules.
Windows wizards, heed these commands within your command prompt as they guide you through the labyrinth of time:
# Activate the virtual environment (if applicable) .\airflow-env\Scripts\activate # Navigate to the directory where your DAGs reside cd airflow\dags And for the Linux sorcerers, embrace the power of these commands in your terminal: # Activate the virtual environment (if applicable) source airflow-env/bin/activate # Navigate to the directory where your DAGs reside cd airflow/dags
Now, let us unravel the incantations that enable us to configure schedules within our DAGs:
2. Configuring time-based schedules using cron expressions:
from airflow import DAG from datetime import datetime from airflow.operators.bash import BashOperator dag = DAG( "my_time_based_dag", description="A marvelous time-based DAG", schedule_interval="0 0 * * *", # Execute at midnight (00:00) every day start_date=datetime(2023, 1, 1), ) task_1 = BashOperator( task_id="task_1", bash_command="echo 'Task 1 is complete'", dag=dag, ) task_2 = BashOperator( task_id="task_2", bash_command="echo 'Task 2 is complete'", dag=dag, ) task_1 >> task_2
3. Configuring interval-based schedules using relative time expressions:
from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash import BashOperator dag = DAG( "my_interval_based_dag", description="An extraordinary interval-based DAG", schedule_interval=timedelta(hours=2), # Execute every 2 hours start_date=datetime(2023, 1, 1), ) task_1 = BashOperator( task_id="task_1", bash_command="echo 'Task 1 is complete'", dag=dag, ) task_2 = BashOperator( task_id="task_2", bash_command="echo 'Task 2 is complete'", dag=dag, ) task_1 >> task_2
Manual Triggering Of DAG Runs
In the realm of Airflow, we possess the ability to command the execution of DAG runs at our will. This allows us to trigger workflows manually, granting us ultimate control over the magical orchestration.
Prepare yourselves, for the commands below, shall guide you on your journey:
For Windows:
# Activate the virtual environment (if applicable) .\airflow-env\Scripts\activate # Trigger a DAG run airflow dags trigger my_dag_id
For Linux:
# Activate the virtual environment (if applicable) source airflow-env/bin/activate # Trigger a DAG run airflow dags trigger my_dag_id
Programmatically triggering DAG runs using the Airflow API
In the realm of Apache Airflow, we possess the ability to command the execution of DAG runs programmatically, opening up a world of possibilities. With the Airflow API at our disposal, we can trigger workflows with a single stroke of code, empowering us to automate and integrate our data pipelines seamlessly.
Windows wizards heed these commands within your command prompt as they summon the power of the Airflow API:
# Activate the virtual environment (if applicable) .\airflow-env\Scripts\activate # Trigger a DAG run using the Airflow REST API curl -X POST -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" http://localhost:8080/api/v1/dags/my_dag_id/dagRuns
And for the Linux sorcerers, embrace the power of these commands in your terminal:
# Activate the virtual environment (if applicable) source airflow-env/bin/activate # Trigger a DAG run using the Airflow REST API curl -X POST -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" http://localhost:8080/api/v1/dags/my_dag_id/dagRuns
Now, let us dive into the captivating world of programmatically triggering DAG runs using the Airflow API:
import requests # Define the necessary details for the Airflow API base_url = "http://localhost:8080/api/v1" api_token = "<YOUR_ACCESS_TOKEN>" dag_id = "my_dag_id" # Define the API endpoint to trigger a DAG run endpoint = f"{base_url}/dags/{dag_id}/dagRuns" # Create the authorization header headers = {"Authorization": f"Bearer {api_token}"} # Trigger the DAG run response = requests.post(endpoint, headers=headers) if response.status_code == 200: print("DAG run triggered successfully!") else: print("Failed to trigger DAG run.")
With these incantations, you can seamlessly integrate your data workflows with other systems, trigger DAG runs on demand, and unlock the true power of automation. Brace yourselves, for the realm of programmatically-triggered DAG runs awaits your command!