可以使用以下示例代码:
在Airflow DAG中,如果要在任务之间传递值,可以使用XCom。但是有时候可能会碰到传递值出现异常的情况,例如:
def push_function_pushing(**kwargs):
ti = kwargs['ti']
data = {'name': 'Alice', 'age': 25}
ti.xcom_push(key='my_data',value=data)
def pull_function_pullng(**kwargs): ti = kwargs['ti'] my_data = ti.xcom_pull(key='my_data') print(my_data)
如果此时在Airflow中运行任务,会得到以下错误信息:
Broken DAG: [your_dag_name] 'Dict' object has no attribute 'record_content'
其根本原因是my_data被定义为一个字典,而此时XCom返回的值是一个Airflow类型的对象。因此,我们需要调整代码如下:
def push_function_pushing(**kwargs):
ti = kwargs['ti']
data = {'name': 'Alice', 'age': 25}
ti.xcom_push(key='my_data',value=data)
def pull_function_pullng(**kwargs):
ti = kwargs['ti']
my_data = ti.xcom_pull(key='my_data')
my_data = my_data.value
print(my_data)
这样就可以正常地从XCom获取值并进行操作了。