在Airflow中,可以使用PythonOperator或BashOperator来执行Python代码。下面是一个示例,演示如何在Airflow中传递参数给Python操作符。
首先,创建一个Python函数,接受参数并执行相应的任务。在这个例子中,我们将创建一个函数,用于将两个数相加并返回结果。
def add_numbers(a, b):
result = a + b
print(f"The sum of {a} and {b} is {result}")
return result
接下来,在Airflow的DAG(Directed Acyclic Graph,有向无环图)中创建一个任务,使用PythonOperator来调用上述函数。在PythonOperator的参数中,可以指定要传递的参数。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def add_numbers(a, b):
result = a + b
print(f"The sum of {a} and {b} is {result}")
return result
default_args = {
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with DAG('passing_parameters', default_args=default_args, schedule_interval='@once') as dag:
task = PythonOperator(
task_id='add_numbers',
python_callable=add_numbers,
op_kwargs={'a': 3, 'b': 5}
)
在上述代码中,我们创建了一个名为passing_parameters的DAG,并在其中定义了一个名为add_numbers的任务。在任务中,我们使用PythonOperator来调用add_numbers函数,并使用op_kwargs参数传递了两个参数(a=3,b=5)。
这样,当Airflow运行这个任务时,add_numbers函数将被调用,并将参数a和b设置为3和5。
请注意,除了op_kwargs参数外,还可以使用其他参数,例如provide_context=True,将上下文信息传递给Python函数。这样,您可以在函数中访问Airflow的上下文变量,例如task_instance和execution_date。
希望这个示例可以帮助您在Airflow中传递参数给Python操作符!