在Airflow中,Python字符串格式化(如'{} {}'.format(a, b))有时会引起一些问题,可能导致任务失败或输出不正确。这是由于任务在不同的环境中运行时,可能会遇到不同的字符串格式化,导致不一致的结果。为了解决这个问题,我们可以使用Airflow提供的字符串插值方法airflow.models.Variable.interpolate()。下面是一个示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG(
'example_dag',
start_date=datetime.now()
)
def hello_world(**context):
name = context['dag_run'].conf.get('name')
message = "Hello, {}".format(name)
context['ti'].xcom_push(key='message', value=message)
task = PythonOperator(
task_id='hello_world',
provide_context=True,
python_callable=hello_world,
dag=dag
)
def print_message(**context):
message = context['ti'].xcom_pull(key='message')
print(message)
task2 = PythonOperator(
task_id='print_message',
provide_context=True,
python_callable=print_message,
dag=dag
)
task >> task2
在上述代码中,我们使用了context['dag_run'].conf.get('name')来获取任务运行时使用的字符串格式。然后,使用"Hello, {}".format(name)创建了一个包含name变量的消息。最后,使用context['ti'].xcom_push(key='message', value=message)将消息写入任务实例的XCom中。在下一个任务中,我们使用context['ti'].xcom_pull(key='message')来获取XCom中的消息,并使用print(message)输出结果。这样可以确保在所有环境中都获得一致的字符串格式,并避免了使用字符串格式化的一些潜在问题。