确定Docker-compose环境是否正确配置,并运行集群。
确认Airflow和Spark Standalone集群间的连接是否正确配置。
确认Airflow中的Docker镜像是否正确安装pyspark依赖。
确认Airflow中的DAG是否正确配置,例如:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
default_args = {
'owner': 'zen',
'depends_on_past': False,
'start_date': datetime(2019, 8, 26),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('pyspark_dag', default_args=default_args, schedule_interval=timedelta(days=1))
t1 = BashOperator(
task_id='pyspark_env_check',
bash_command='source my_env/bin/activate && python my_script.py',
dag=dag)
t2 = SparkSubmitOperator(
task_id='pyspark_job',
application='path/to/my/job.py',
conn_id='spark_conn',
name='pyspark_job',
executor_memory='2g',
num_executors=2,
application_args=['arg1', 'arg2'],
verbose=False,
dag=dag)
t1 >> t2
确认Spark Standalone集群中的pyspark是否正确安装依赖。
确认Airflow中的pyspark任务是否成功提交到Spark Standalone集群,并查看任务的日志信息,以确定任务是否成功运行。
如果任务仍然被卡住,请查看集群的日志信息,以确定问题的具体原因。