要解决Airflow xcom_pull无法提供相同上游任务实例运行的数据,而是提供最新数据的问题,可以使用XComArg传递任务实例之间的数据。下面是一个示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def push_data(**kwargs):
data = "example data"
kwargs['ti'].xcom_push(key='my_key', value=data)
def pull_data(**kwargs):
data = kwargs['ti'].xcom_pull(task_ids='push_task', key='my_key')
print(data)
with DAG(dag_id='xcom_example', schedule_interval=None, start_date=days_ago(1)) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_data,
provide_context=True
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_data,
provide_context=True
)
push_task >> pull_task
在这个例子中,我们有两个任务:push_task和pull_task。push_task通过xcom_push将数据推送到XCom中。然后,在pull_task中,我们使用xcom_pull从XCom中获取存储的数据。
使用XComArg时,可以通过xcom_args参数向下游任务传递上游任务的实例。这样,每个任务实例都可以访问与其相同的上游任务实例的数据。以下是修改后的示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def push_data(**kwargs):
data = "example data"
kwargs['ti'].xcom_push(key='my_key', value=data)
def pull_data(**kwargs):
data = kwargs['ti'].xcom_pull(task_ids='push_task', key='my_key')
print(data)
with DAG(dag_id='xcom_example', schedule_interval=None, start_date=days_ago(1)) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_data,
provide_context=True
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_data,
provide_context=True,
op_args=['{{ ti.xcom_pull(task_ids="push_task", key="my_key") }}']
)
push_task >> pull_task
在这个例子中,我们将{{ ti.xcom_pull(task_ids="push_task", key="my_key") }}
作为op_args传递给pull_task,这样pull_task将使用与其相同的上游任务实例的数据。