在Airflow中,可以使用Jinja模板语言来动态地设置任务的变量。可以通过在变量后面加上|default(default_value)
来设置变量的默认值。如果变量没有被定义或者没有传递值,将会使用默认值。
以下是一个示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1)
}
def print_message(message):
print(message)
dag = DAG('jinja_template_example', default_args=default_args, schedule_interval=None)
with dag:
message = "{{ params.message | default('Hello, World!') }}"
task1 = PythonOperator(
task_id='task1',
python_callable=print_message,
op_kwargs={'message': message},
provide_context=True
)
task1
在上面的示例中,我们定义了一个DAG,其中有一个任务task1。我们使用Jinja模板语言来设置变量message
的默认值为'Hello, World!'
。
在PythonOperator中,我们将message
作为关键字参数传递给print_message
函数。
这样,如果在执行DAG时没有传递message
参数,将会使用默认值'Hello, World!'
。如果传递了message
参数,将会使用传递的值。
希望对你有所帮助!