要在Airflow中使用SparkOperator,需要安装pyspark模块。然而在某些情况下,即使安装了pyspark模块,仍然会出现Spark连接类型不显示的问题。
解决方法是手动设置Spark连接类型。在Airflow的dag文件中,添加如下代码:
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone
SPARK_HOME = "{path_to_spark_home}"
SPARK_CONN_ID = "{your_spark_conn_id}"
default_args = {
'owner': 'airflow',
'start_date': timezone.datetime(2019, 1, 1),
}
with DAG(dag_id='example_dag', default_args=default_args, schedule_interval='@once') as dag:
spark_task = SparkSubmitOperator(
task_id='spark_task',
conn_id=SPARK_CONN_ID,
application="{path_to_your_spark_script}",
verbose=False,
conf={
"spark.home": SPARK_HOME,
"spark.eventLog.enabled": "true",
},
)
其中,SPARK_HOME是你本地Spark的安装路径(路径中不要包含中文),SPARK_CONN_ID为Spark连接类型在Airflow中的名称,"{path_to_your_spark_script}"为你的Spark脚本的路径。
然后在Airflow的Web UI中进入Connections页面,新建一个Spark连接类型,填入如下信息:
{your_spark_conn_id}Sparkyarn8088{"queue": "{your_yarn_queue}"}保存连接类型后,刷新DAG页面,就能看到你新建的Spark连接类型了。同时,你也可以通过代码中的conn_id参数来指定使用哪个Spark连接类型。
参考资料: