需要在Airflow中声明本地目录,例如使用PythonOperator中的参数provide_context=True,然后在操作中使用Python导入本地模块。
示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_function(**kwargs):
import sys
sys.path.insert(0, '/path/to/local/module')
from my_module import my_function
my_function()
dag = DAG('my_dag', description='My DAG', schedule_interval='0 0 * * *', start_date=datetime(2019, 1, 1), catchup=False)
task = PythonOperator(
task_id='my_task',
provide_context=True,
python_callable=my_function,
dag=dag
)