Organize Airflow DAGs with Task Groups

Jash Bhatt
10 min readNov 6, 2023

Introduction

If you are an Airflow user, you know how important it is to keep your DAGs clean and easy to understand. You don’t want to end up with a messy spaghetti code that makes your eyes bleed and your brain hurt.

But sometimes, it’s hard to avoid complexity when you have a lot of tasks that are closely related or depend on each other. You might end up with a DAG that looks like this:

This DAG has 18 tasks, but it’s hard to tell what they are doing and how they are connected. It’s also hard to maintain and modify this DAG, as any change might affect other tasks or break the dependencies.

Wouldn’t it be nice if you could group some of these tasks together and make them look like a single unit? Something like this:

Taskgroup Collapsed
Taskgroup Expanded

This DAG has only 6 tasks, but it’s much easier to read and understand. Each task represents a group of tasks that perform a specific function. You can also expand or collapse these task groups to see the details of the tasks inside them.

This is what task groups in Airflow can do for you. Task groups are a feature that allows you to group multiple tasks into a single node in the Airflow UI, making your DAGs more organized and manageable. In this story, we will see how to use task groups in your Airflow DAGs and how they can benefit you in terms of code readability, reusability, and scalability.

What are Task Groups and how do they work

Task groups are a way of grouping tasks together in a DAG, so that they appear as a single node in the Airflow UI. Task groups can have their own dependencies, retries, trigger rules, and other parameters, just like regular tasks. Task groups can also contain other task groups, creating a hierarchical structure of tasks.

Task groups are implemented as a subclass of BaseOperator, which means they inherit all the attributes and methods of the base class. However, task groups are not executed as tasks, but rather as containers of tasks. When a task group is triggered, it will trigger all the tasks inside it, according to their dependencies and trigger rules.

How to define Task Groups in your DAGs using two different methods

There are two ways to define task groups in your Airflow DAGs: using the TaskGroup context manager or using the task_group decorator. Both methods have their pros and cons, and you can choose the one that suits your needs and preferences.

Using the TaskGroup context manager

One way to create a task group is to use the TaskGroup context manager. This allows you to define a block of code that will be grouped together as a single unit in the Airflow UI. For example, let’s say you have a DAG that performs some data processing steps on a CSV file. You can group the tasks that read, validate, and transform the data into a task group called process_data:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="example_task_group", start_date=datetime(2023, 1, 1), schedule_interval=None) as dag:

# A dummy task to start the DAG
start = BashOperator(task_id="start", bash_command="echo start")

# A task group to process the data
with TaskGroup(group_id="process_data") as process_data:

# A task to read the data from a CSV file
read_data = PythonOperator(task_id="read_data", python_callable=read_data)

# A task to validate the data
validate_data = PythonOperator(task_id="validate_data", python_callable=validate_data)

# A task to transform the data
transform_data = PythonOperator(task_id="transform_data", python_callable=transform_data)

# A dummy task to end the DAG
end = BashOperator(task_id="end", bash_command="echo end")

# Define the dependencies
start >> process_data >> end

This will result in a DAG that looks like this in the Airflow UI:

As you can see, the tasks inside the process_data task group are collapsed into a single node, which makes the DAG more readable and manageable. You can also expand or collapse the task group by clicking on it, which will show or hide the individual tasks inside it.

Just a heads up, the Python DAG code shared above is more of a concept demo. It might need a little tweaking to rock in a real-world setting.

Using the task_group decorator

Another way to create a task group is to use the task_group decorator. This allows you to define a function that returns a list of tasks that will be grouped together as a single unit in the Airflow UI. For example, let’s say you have a DAG that performs some data analysis steps on a pandas dataframe. You can group the tasks that calculate the mean, median, and mode of the data into a task group called calculate_stats:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="example_task_group", start_date=datetime(2023, 1, 1), schedule_interval=None) as dag:

# A dummy task to start the DAG
start = PythonOperator(task_id="start", python_callable=start)

# A task group to calculate the statistics of the data
@task_group(group_id="calculate_stats")
def calculate_stats(dataframe):
# A task to calculate the mean of the data
mean = PythonOperator(task_id="mean", python_callable=mean, op_kwargs={"dataframe": dataframe})

# A task to calculate the median of the data
median = PythonOperator(task_id="median", python_callable=median, op_kwargs={"dataframe": dataframe})

# A task to calculate the mode of the data
mode = PythonOperator(task_id="mode", python_callable=mode, op_kwargs={"dataframe": dataframe})

# Return the list of tasks
return [mean, median, mode]

# A dummy task to end the DAG
end = PythonOperator(task_id="end", python_callable=end)

# Define the dependencies
start >> calculate_stats(dataframe) >> end

This will result in a DAG that looks like this in the Airflow UI:

As you can see, the tasks inside the calculate_stats task group are collapsed into a single node, which makes the DAG more readable and manageable. You can also expand or collapse the task group by clicking on it, which will show or hide the individual tasks inside it.

Just a heads up, the Python DAG code shared above is more of a concept demo. It might need a little tweaking to rock in a real-world setting.

How to use Task Groups for dynamic task generation and looping

Another feature of task groups is that they can be used to dynamically generate tasks and loop over them. This can be useful when you have a variable number of tasks that need to be executed based on some condition or input. For example, let’s say you have a DAG that performs some data analysis on multiple files in a directory. You can use a task group to loop over the files and perform some operation on each file:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="example_task_group", start_date=datetime(2023, 1, 1), schedule_interval=None) as dag:

# A dummy task to start the DAG
start = BashOperator(task_id="start", bash_command="echo start")

# A task to get the list of files in the directory
get_files = PythonOperator(task_id="get_files", python_callable=get_files)

# A task group to loop over the files and perform some operation on each file
with TaskGroup(group_id="loop_files") as loop_files:

# A task to perform some operation on each file
def operate_file(file):
return PythonOperator(task_id=f"operate_{file}", python_callable=operate_file, op_kwargs={"file": file})

# A list comprehension to generate the tasks dynamically
tasks = [operate_file(file) for file in get_files.output]

# A dummy task to end the DAG
end = BashOperator(task_id="end", bash_command="echo end")

# Define the dependencies
start >> get_files >> loop_files >> end

As you can see in above code, the tasks inside the loop_files task group are dynamically generated based on the output of the get_files task, which is a list of files in the directory. The task group will loop over the files and perform some operation on each file. You can also expand or collapse the task group by clicking on it, which will show or hide the individual tasks inside it.

Just a heads up, the Python DAG code shared above is more of a concept demo. It might need a little tweaking to rock in a real-world setting.

How to customize the appearance and behavior of Task Groups in the Airflow UI

You can also customize the appearance and behavior of task groups in the Airflow UI, such as changing the color, shape, label, tooltip, or icon of the task group node. You can also control how the task group is expanded or collapsed by default, or whether it is visible or hidden in the graph view.

To customize the task group, you can pass some optional arguments to the TaskGroup context manager or the task_group decorator, such as:

  • ui_color: The color of the task group node in the UI. You can use any valid CSS color value, such as hex codes, RGB values, or color names. For example, ui_color="green" will make the task group node green.
  • ui_fgcolor: The foreground color of the task group node in the UI. This affects the color of the text and the icon inside the node. You can use any valid CSS color value, such as hex codes, RGB values, or color names. For example, ui_fgcolor="black" will make the text and icon black.
  • tooltip: The tooltip of the task group node in the UI. This is the text that appears when you hover over the node. You can use any string value, but it should be informative and concise. For example, tooltip="This task group performs data processing steps on a CSV file" will make the task group node display that text as the tooltip.
  • params: A dictionary of parameters that will be passed to all the tasks in the TaskGroup. Default is None.
  • prefix_group_id: This parameter determines whether the group_id of the TaskGroup will be added as a prefix to the task_id of the tasks inside the TaskGroup. For example, if you have a TaskGroup with group_id='group1' and a task inside it with task_id='task1', then the full task_id of the task will be group1.task1 if prefix_group_id=True, or just task1 if prefix_group_id=False. This can help you avoid naming conflicts and make your tasks more organized. Default is True.

Here is an example of how to customize a task group using these arguments:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="example_task_group", start_date=datetime(2023, 1, 1), schedule_interval=None) as dag:

# A dummy task to start the DAG
start = BashOperator(task_id="start", bash_command="echo start")

# A task group to process the data
with TaskGroup(
group_id="process_data",
ui_color="green",
ui_fgcolor="black",
tooltip="This task group performs data processing steps on a CSV file",
) as process_data:

# A task to read the data from a CSV file
read_data = BashOperator(task_id="read_data", bash_command="echo read_data")

# A task to validate the data
validate_data = BashOperator(task_id="validate_data", bash_command="echo validate_data")

# A task to transform the data
transform_data = BashOperator(task_id="transform_data", bash_command="echo transform_data")

# A dummy task to end the DAG
end = BashOperator(task_id="end", bash_command="echo end")

# Define the dependencies
start >> process_data >> end

This will result in a DAG that looks like this in the Airflow UI:

Expanded
Collapsed with Tooltip

As you can see, the task group node has a custom color, tooltip, and expand state, which makes it more distinctive and informative.

Just a heads up, the Python DAG code shared above is more of a concept demo. It might need a little tweaking to rock in a real-world setting.

How to benefit from Task Groups

There are several benefits of using task groups in your Airflow DAGs, such as:

  • Improving code readability: By grouping related tasks together, you can improve the readability and understandability of your code, making it easier to follow the logic and flow of your DAGs. You can also use descriptive names and labels for your task groups, which can help you document your DAGs and communicate your intentions to other users or developers.
  • Enhancing code reusability: By using task groups, you can create reusable modules of tasks that can be imported and used in different DAGs, increasing the reusability and maintainability of your code. You can also use task groups to create sub-DAGs that can be triggered by other DAGs, creating a modular and flexible workflow architecture.
  • Increasing code scalability: By using task groups, you can reduce the number of tasks and dependencies in your DAGs, which can improve the performance and scalability of your workflows. You can also use task groups to dynamically generate tasks based on some condition or input, which can help you handle variable or unpredictable workloads.

Conclusion

In this story, we have learned how to use task groups in Airflow to organize our DAGs and make them more readable, reusable, and scalable. We have seen how to define task groups using two different methods: the TaskGroup context manager and the task_group decorator. We have also seen how to use task groups for dynamic task generation and looping, and how to customize the appearance and behavior of task groups in the Airflow UI. Task groups are a powerful feature that can help us create more modular and maintainable workflows in Airflow. I hope you enjoyed this story and found it useful.

If you have any questions or feedback, please leave a comment below. If you liked this story, please clap, share, and follow me here on Medium and LinkedIn. I appreciate your support and feedback. Stay tuned for more stories on Airflow and other topics. Happy Airflowing!

--

--