- 任务跳过:在DAG文件中,可以使用
ShortCircuitOperator
和SkipOperator
来实现任务跳过。在任务执行之前,先判断某个条件是否为True,如果是,则可以使用ShortCircuitOperator
来跳过该任务,如果否,则使用SkipOperator
来跳过该任务。
代码示例:
from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
def skip_if_not_prod():
if 'prod' not in airflow.conf.get('core', 'env'):
return True
return False
with DAG('example_dag',
default_args=default_args,
schedule_interval=None,
) as dag:
start = DummyOperator(task_id='start')
task_to_skip = DummyOperator(task_id='task_to_skip')
final_task = DummyOperator(task_id='final_task')
condition_to_skip_task = ShortCircuitOperator(
task_id='condition_to_skip_task',
python_callable=skip_if_not_prod,
)
start >> condition_to_skip_task >> task_to_skip >> final_task
ignore_condition = DummyOperator(task_id='ignore_condition', trigger_rule=TriggerRule.ALL_DONE)
condition_to_skip_task >> ignore_condition >> final_task
- 任务实例在任务之间传递:在DAG文件中,可以使用
XCom
来实现任务实例在任务之间传递。通过XCom
,可以将任务中产生的结果或数据传递到下一个任务中。任务之间传递的数据是以键值对的形式存在的,可以在任何任务中进行读取。
代码示例:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def push_function(**kwargs):
kwargs['ti'].xcom_push(key='data', value={'name': 'Alice', 'age': 25})
def pull_function(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(key='data', task