在Airflow中,可以使用PythonVirtualenvOperator操作符来执行Python脚本。这个操作符允许你运行在一个指定的虚拟环境中的Python脚本。要在Python脚本中访问Airflow的上下文和datetime,可以通过将上下文和datetime变量作为参数传递给Python脚本来完成。以下是一个示例:
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
}
dag = DAG(
dag_id='example_dag',
default_args=default_args,
schedule_interval='@daily',
)
def my_function(**context):
execution_date = context['execution_date']
# do something with execution_date
python_task = PythonVirtualenvOperator(
task_id='my_python_task',
python_callable=my_function,
requirements=['pandas==1.3.4'],
system_site_packages=False,
op_kwargs={
'execution_date': '{{ ds }}',
},
dag=dag,
)
在这个示例中,我们定义了一个PythonVirtualenvOperator任务,并将my_function函数作为python_callable参数传递。我们还将requirements参数设置为一个包含所有需要的Python包的列表,并将system_site_packages参数设置为False,这将使PythonVirtualenvOperator在一个干净的虚拟环境中运行Python脚本。最后,我们将execution_date作为op_kwargs参数传递给PythonVirtualenvOperator,让Python脚本可以访问它。在my_function函数中,我们可以使用**context参数来访问Airflow的上下文,例如execution_date。