We will provide an example of a DAG ETL task using Airflow. The goal is to schedule a task to run on a daily basis that:
- Downloads the server access log file which is available at the URL
- From the log file, keep only the 1st and 4th columns. The columns are “#” delimited
- Capitalize the 2nd column of the previous step
- Compress the data from the previous step
The DAG file is a .py
file called ETL_Server_Access_Log_Processing.py
such as:
# import the libraries from datetime import timedelta # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to write tasks! from airflow.operators.bash_operator import BashOperator # This makes scheduling easy from airflow.utils.dates import days_ago #defining DAG arguments # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'George Pipis', 'start_date': days_ago(0), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # defining the DAG # define the DAG dag = DAG( 'ETL_Server_Access_Log_Processing', default_args=default_args, description='My first DAG', schedule_interval=timedelta(days=1), ) # define the tasks # define the task 'download' download = BashOperator( task_id='download', bash_command='wget "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt"', dag=dag, ) # define the task 'extract' extract = BashOperator( task_id='extract', bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt', dag=dag, ) # define the task 'transform' transform = BashOperator( task_id='transform', bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt > /home/project/airflow/dags/capitalized.txt', dag=dag, ) # define the task 'load' load = BashOperator( task_id='load', bash_command='zip log.zip capitalized.txt' , dag=dag, ) # task pipeline download >> extract >> transform >> load
Once we have created the DAG, we can submit it by copying the file and the /home/project/airflow/dags
path:
cp ETL_Server_Access_Log_Processing.py /home/project/airflow/dags
To verify that the DAG exists, we can run:
airflow dags list
=========================================+=================================================================================+===================+=======
ETL_Server_Access_Log_Processing | ETL_Server_Access_Log_Processing.py | George Pipis | True
example_bash_operator | /home/airflow/.local/lib/python3.7/site-packages/airflow/example_dags/example_b | airflow | True
| ash_operator.py | |
example_branch_datetime_operator | /home/airflow/.local/lib/python3.7/site-packages/airflow/example_dags/example_b | airflow | True
| ranch_datetime_operator.py | |
We can see our DAG from the UI as well:
Sources
- [1]: Coursera