是的,Airflow XCOM能够在BigQueryInsertJobOperator和BigQueryOperator中使用。以下是如何在Airflow DAG中使用XCOM从上一个任务中拉取数据并将其推送到下一个任务的示例:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryInsertJobOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval=timedelta(days=1),
)
def extract_data():
data = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35}
]
return data
def load_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract_data_task')
sql = """
INSERT INTO `myproject.mydataset.mytable` (name, age)
VALUES ('{}', {});
""".format(data['name'], data['age'])
insert_task = BigQueryInsertJobOperator(
task_id='insert_data_task',
configuration={
'query': {
'query': sql
}
},
dag=dag
)
insert_task.execute(context=kwargs)
extract_task = PythonOperator(
task_id='extract_data_task',
python_callable=extract_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_data_task',
python_callable=load_data,
provide_context=True,
dag=dag
)
extract_task >> load_task
在这个示例中,我们定义了两个任务:extract_data_task