可以通过在自定义Operator类中定义类变量来创建可配置的变量,也可以使用Airflow系统变量。
示例代码如下:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
# 可配置的类变量
my_variable = 'default_value'
@apply_defaults
def __init__(self, my_arg, **kwargs):
super().__init__(**kwargs)
self.my_arg = my_arg
def execute(self, context):
# 使用类变量
print('The value of my_variable is:', self.my_variable)
# ....
使用系统变量:
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def my_function(**kwargs):
my_variable = Variable.get('my_variable', default_var='default_value')
# ....
my_operator = PythonOperator(
task_id='my_task',
python_callable=my_function,
dag=my_dag
)
在Airflow的web UI中,可以在Admin -> Variables中进行系统变量的配置。