Dags

Adding dags in airflow to run the function periodically.

We used schedule_airflow_tasks function to set up the dag in airflow

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.email import send_email
from datetime import datetime
import subprocess


def schedule_airflow_tasks(
    dag_id,
    task_configs,
    schedule_interval,
    description=None,
    tags=None,
    max_active_runs=1,
    concurrency=1,
):
    def pause_dag(dag_id):
        try:
            command = f"airflow dags pause {dag_id}"
            subprocess.run(command, shell=True, check=True)
        except subprocess.CalledProcessError as e:
            print(f"Failed to pause DAG {dag_id}: {str(e)}")

    def send_failure_email(context):
        task_instance = context["task_instance"]
        execution_date = context["execution_date"].isoformat()
        log_url = task_instance.log_url

        subject = f"Failed Task Alert: {task_instance.task_id} in DAG {task_instance.dag_id}"
        body = f"""
        <p>The following task has failed:</p>
        <ul>
            <li><strong>DAG ID:</strong> {task_instance.dag_id}</li>
            <li><strong>Task ID:</strong> {task_instance.task_id}</li>
            <li><strong>Execution Date:</strong> {execution_date}</li>
            <li><strong>Log:</strong> <a href="{log_url}">{log_url}</a></li>
            <li><strong>Error Message:</strong> {context["exception"]}</li>
        </ul>
        """
        send_email(["support@lineverge.com"], subject, body)
        pause_dag(task_instance.dag_id)

    # Set DAG
    dag = DAG(
        dag_id=dag_id,
        default_args={
            "owner": "airflow",
            "start_date": datetime(2024, 1, 1),
            "email_on_failure": False,
            "on_failure_callback": send_failure_email,
            "retries": 0,
        },
        description=description,
        schedule_interval=schedule_interval,
        catchup=False,
        max_active_runs=max_active_runs,
        concurrency=concurrency,
        tags=tags,
    )

    # Set tasks
    tasks = []
    for task_id_prefix, task_config in enumerate(task_configs, 1):
        if task_config.get("operator") == "bash":
            task_id = f"{str(task_id_prefix).zfill(2)}_bash"
            task = BashOperator(
                task_id=task_id,
                bash_command=task_config["bash_command"],
                dag=dag,
            )
        else:
            python_callable_name = task_config["python_callable"].__name__
            task_id = f"{str(task_id_prefix).zfill(2)}_{python_callable_name}"
            task = PythonOperator(
                task_id=task_id,
                python_callable=task_config["python_callable"],
                op_kwargs=task_config.get("op_kwargs"),
                dag=dag,
            )
        tasks.append(task)

    # Set dependencies
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]

    return dag

While using dag, we need to import the original function file in python then call schedule_airflow_tasks as followed:

from project.uploadfile_data_verification_template import *

dag = schedule_airflow_tasks(
    dag_id="uploadfile_data_verification_template",
    task_configs=[
        {
            "python_callable": process_uploadfiles_dataverificationtemplate,
        },
    ],
    schedule_interval=timedelta(minutes=2),
    tags=["Automail", "Upload"],
)

The parameters are:

  • python_callable function name as a string

  • op_kwargs function parameters

  • scheduke_interval: how often the airflow run the function

  • tags: list of the tag for the function

Last updated