Dags
Adding dags in airflow to run the function periodically.
We used schedule_airflow_tasks function to set up the dag in airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.email import send_email
from datetime import datetime
import subprocess
def schedule_airflow_tasks(
dag_id,
task_configs,
schedule_interval,
description=None,
tags=None,
max_active_runs=1,
concurrency=1,
):
def pause_dag(dag_id):
try:
command = f"airflow dags pause {dag_id}"
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to pause DAG {dag_id}: {str(e)}")
def send_failure_email(context):
task_instance = context["task_instance"]
execution_date = context["execution_date"].isoformat()
log_url = task_instance.log_url
subject = f"Failed Task Alert: {task_instance.task_id} in DAG {task_instance.dag_id}"
body = f"""
<p>The following task has failed:</p>
<ul>
<li><strong>DAG ID:</strong> {task_instance.dag_id}</li>
<li><strong>Task ID:</strong> {task_instance.task_id}</li>
<li><strong>Execution Date:</strong> {execution_date}</li>
<li><strong>Log:</strong> <a href="{log_url}">{log_url}</a></li>
<li><strong>Error Message:</strong> {context["exception"]}</li>
</ul>
"""
send_email(["support@lineverge.com"], subject, body)
pause_dag(task_instance.dag_id)
# Set DAG
dag = DAG(
dag_id=dag_id,
default_args={
"owner": "airflow",
"start_date": datetime(2024, 1, 1),
"email_on_failure": False,
"on_failure_callback": send_failure_email,
"retries": 0,
},
description=description,
schedule_interval=schedule_interval,
catchup=False,
max_active_runs=max_active_runs,
concurrency=concurrency,
tags=tags,
)
# Set tasks
tasks = []
for task_id_prefix, task_config in enumerate(task_configs, 1):
if task_config.get("operator") == "bash":
task_id = f"{str(task_id_prefix).zfill(2)}_bash"
task = BashOperator(
task_id=task_id,
bash_command=task_config["bash_command"],
dag=dag,
)
else:
python_callable_name = task_config["python_callable"].__name__
task_id = f"{str(task_id_prefix).zfill(2)}_{python_callable_name}"
task = PythonOperator(
task_id=task_id,
python_callable=task_config["python_callable"],
op_kwargs=task_config.get("op_kwargs"),
dag=dag,
)
tasks.append(task)
# Set dependencies
for i in range(len(tasks) - 1):
tasks[i] >> tasks[i + 1]
return dag
While using dag, we need to import the original function file in python then call schedule_airflow_tasks as followed:
from project.uploadfile_data_verification_template import *
dag = schedule_airflow_tasks(
dag_id="uploadfile_data_verification_template",
task_configs=[
{
"python_callable": process_uploadfiles_dataverificationtemplate,
},
],
schedule_interval=timedelta(minutes=2),
tags=["Automail", "Upload"],
)
The parameters are:
python_callable function name as a string
op_kwargs function parameters
scheduke_interval: how often the airflow run the function
tags: list of the tag for the function
Last updated