Airflow中的任务组(TaskGroup)是一组任务的集合。在测试任务组时,可以使用pytest-airflow插件。要使用pytest-airflow,需要按照以下步骤进行配置:
1.在Airflow项目中创建一个tests文件夹。
2.安装pytest和pytest-airflow。
pip install pytest pytest-airflow
3.创建一个pytest.ini文件
[pytest] addopts = --verbose --color=yes --strict-markers markers = airflow: mark test to be run only when Airflow is installed connection: mark test to require external database connections focus: mark test as focused (uses unittest.skip by default) dag: mark test as a DAG test dag_run: mark test as a DAG Run test provider: mark test as a provider test task: mark test as a Task test
[pytest-airflow]
from airflow.utils.task_group import TaskGroup from airflow.operators.dummy_operator import DummyOperator from airflow.decorators.task_group_decorator import task_group from airflow.decorators dag, task
default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(1)}
with TaskGroup("task_group_name") as tg: dummy_task_1 = DummyOperator(task_id="dummy_task_1") dummy_task_2 = DummyOperator(task_id="dummy_task_2")
@task
def task_1():
return "task_1"
@task
def task_2():
return "task_2"
dag_task_3 = BashOperator(
task_id="bash_task_3",
bash_command="echo task_3",
)
dag = DAG( dag_id="test_dag_id", default_args=default_args, schedule_interval="@daily", )
@task_group def task_group_func(): tg >> [task_1(), task_2()]
dag_task_group = task_group_func()
dag_task_3.set_upstream(dag_task_group)
pytest tests/test_tasks.py
运行测试时,它将自动发现所有