Predictive Hacks

A Practical Example of a DAG in Airflow

We will provide an example of a DAG ETL task using Airflow. The goal is to schedule a task to run on a daily basis that:

  • Downloads the server access log file which is available at the URL
  • From the log file, keep only the 1st and 4th columns. The columns are “#” delimited
  • Capitalize the 2nd column of the previous step
  • Compress the data from the previous step

The DAG file is a .py file called ETL_Server_Access_Log_Processing.py such as:

# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago

#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'George Pipis',
    'start_date': days_ago(0),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}



# defining the DAG

# define the DAG
dag = DAG(
    'ETL_Server_Access_Log_Processing',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)


# define the tasks

# define the task 'download'

download = BashOperator(
    task_id='download',
    bash_command='wget "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt"',
    dag=dag,
)


# define the task 'extract'

extract = BashOperator(
    task_id='extract',
    bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt',
    dag=dag,
)


# define the task 'transform'

transform = BashOperator(
    task_id='transform',
    bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt > /home/project/airflow/dags/capitalized.txt',
    dag=dag,
)


# define the task 'load'

load = BashOperator(
    task_id='load',
    bash_command='zip log.zip capitalized.txt' ,
    dag=dag,
)


# task pipeline

download >> extract >> transform >> load

Once we have created the DAG, we can submit it by copying the file and the /home/project/airflow/dags path:

 cp  ETL_Server_Access_Log_Processing.py /home/project/airflow/dags

To verify that the DAG exists, we can run:

airflow dags list
=========================================+=================================================================================+===================+=======
ETL_Server_Access_Log_Processing         | ETL_Server_Access_Log_Processing.py                                             | George Pipis      | True  
example_bash_operator                    | /home/airflow/.local/lib/python3.7/site-packages/airflow/example_dags/example_b | airflow           | True  
                                         | ash_operator.py                                                                 |                   |       
example_branch_datetime_operator         | /home/airflow/.local/lib/python3.7/site-packages/airflow/example_dags/example_b | airflow           | True  
                                         | ranch_datetime_operator.py                                                      |                   |       

We can see our DAG from the UI as well:

Sources

Share This Post

Share on facebook
Share on linkedin
Share on twitter
Share on email

Subscribe To Our Newsletter

Get updates and learn from the best

More To Explore

Python

Image Captioning with HuggingFace

Image captioning with AI is a fascinating application of artificial intelligence (AI) that involves generating textual descriptions for images automatically.

Python

Intro to Chatbots with HuggingFace

In this tutorial, we will show you how to use the Transformers library from HuggingFace to build chatbot pipelines. Let’s