可以通过使用 XCom 在 Airflow 中实现顶层 Python 代码与操作者变量之间的通信。
例如,我们可以在顶层 Python 代码中使用 Variable
来获取操作者的变量:
from airflow.models import Variable
my_var = Variable.get("my_variable")
然后,我们可以使用 XCom
将此变量传递给操作者:
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import XCom
def my_function(**context):
my_var = Variable.get("my_variable")
context['ti'].xcom_push(key="my_var", value=my_var)
my_operator = PythonOperator(
task_id="my_operator",
python_callable=my_function,
provide_context=True,
dag=my_dag
)
my_other_operator = BashOperator(
task_id="my_other_operator",
bash_command='echo {{ ti.xcom_pull(key="my_var") }}',
dag=my_dag
)
在上面的示例中,我们使用 PythonOperator
来执行顶层 Python 代码,并将变量使用 XCom
传递给下一个 BashOperator
。在 BashOperator
中,我们使用 ti.xcom_pull()
方法从上一个任务中获取该变量的值,然后将其打印出来。
这样,我们就可以在 Airflow 中实现顶层 Python 代码与操作者变量之间的通信了。