Airflow在1.10.0版本中引入了providers,旨在将外部依赖项隔离到各自的提供程序包中,从而使Airflow更易于维护和更新。它们使得共享提供程序之间更容易维护,更容易升级其中一个,而无需更新所有其他的提供程序。
一系列“Provider”包被视为以下:
airflow.providers.
这里的provider_name对应于您正在使用的提供程序库的名称。
现在,大多数旧的Contrib库已被转移到新的Providers包中。而那些未移动的库则被称为“Legacy”代码,并被放置在airflow.contrib包下面。
因此,可以通过查找代码来确定需要使用哪个包:
from airflow.providers import package_name from airflow.contrib import package_name
举例说明,在新的Providers包中,要使用SparkSubmitOperator,该操作符的导入语句如下:
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
而在旧的Contrib包中,导入语句为:
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
因此,建议在使用新的Airflow版本时,尽可能使用providers包中的操作符。
完整示例:
from datetime import datetime from airflow import DAG from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 1, 1), }
dag = DAG( 'spark_submit_demo', default_args=default_args, description='A simple DAG to demo the SparkSubmitOperator', )
task_submit_spark_app = SparkSubmitOperator( task_id='submit_spark_app', application='/path/to/your/spark/app.py', conn_id='spark_default', dag=dag, )
task_submit_spark_app # task dependency