> For the complete documentation index, see [llms.txt](https://docs.lineverge.com/automail/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.lineverge.com/automail/configurations/workflow-configuration/dags.md).

# Dags

We used schedule\_airflow\_tasks function to set up the dag in airflow

```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.email import send_email
from datetime import datetime
import subprocess


def schedule_airflow_tasks(
    dag_id,
    task_configs,
    schedule_interval,
    description=None,
    tags=None,
    max_active_runs=1,
    concurrency=1,
):
    def pause_dag(dag_id):
        try:
            command = f"airflow dags pause {dag_id}"
            subprocess.run(command, shell=True, check=True)
        except subprocess.CalledProcessError as e:
            print(f"Failed to pause DAG {dag_id}: {str(e)}")

    def send_failure_email(context):
        task_instance = context["task_instance"]
        execution_date = context["execution_date"].isoformat()
        log_url = task_instance.log_url

        subject = f"Failed Task Alert: {task_instance.task_id} in DAG {task_instance.dag_id}"
        body = f"""
        <p>The following task has failed:</p>
        <ul>
            <li><strong>DAG ID:</strong> {task_instance.dag_id}</li>
            <li><strong>Task ID:</strong> {task_instance.task_id}</li>
            <li><strong>Execution Date:</strong> {execution_date}</li>
            <li><strong>Log:</strong> <a href="{log_url}">{log_url}</a></li>
            <li><strong>Error Message:</strong> {context["exception"]}</li>
        </ul>
        """
        send_email(["support@lineverge.com"], subject, body)
        pause_dag(task_instance.dag_id)

    # Set DAG
    dag = DAG(
        dag_id=dag_id,
        default_args={
            "owner": "airflow",
            "start_date": datetime(2024, 1, 1),
            "email_on_failure": False,
            "on_failure_callback": send_failure_email,
            "retries": 0,
        },
        description=description,
        schedule_interval=schedule_interval,
        catchup=False,
        max_active_runs=max_active_runs,
        concurrency=concurrency,
        tags=tags,
    )

    # Set tasks
    tasks = []
    for task_id_prefix, task_config in enumerate(task_configs, 1):
        if task_config.get("operator") == "bash":
            task_id = f"{str(task_id_prefix).zfill(2)}_bash"
            task = BashOperator(
                task_id=task_id,
                bash_command=task_config["bash_command"],
                dag=dag,
            )
        else:
            python_callable_name = task_config["python_callable"].__name__
            task_id = f"{str(task_id_prefix).zfill(2)}_{python_callable_name}"
            task = PythonOperator(
                task_id=task_id,
                python_callable=task_config["python_callable"],
                op_kwargs=task_config.get("op_kwargs"),
                dag=dag,
            )
        tasks.append(task)

    # Set dependencies
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]

    return dag
```

While using dag, we need to import the original function file in python then call schedule\_airflow\_tasks as followed:

```python
from project.uploadfile_data_verification_template import *

dag = schedule_airflow_tasks(
    dag_id="uploadfile_data_verification_template",
    task_configs=[
        {
            "python_callable": process_uploadfiles_dataverificationtemplate,
        },
    ],
    schedule_interval=timedelta(minutes=2),
    tags=["Automail", "Upload"],
)
```

The parameters are:

* python\_callable function name as a string
* op\_kwargs function parameters
* scheduke\_interval: how often the airflow run the function
* tags: list of the tag for the function


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://docs.lineverge.com/automail/configurations/workflow-configuration/dags.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
