from jinja2 import Template
with open('/path/to/spark-submit-template.j2', 'r') as f:
template = Template(f.read())
cmd = template.render(
app_name='my_app',
jars='/path/to/jars',
main_class='com.mycompany.MyClass',
spark_master='local[*]',
spark_submit_params='--conf spark.driver.memory=2g',
arguments='arg1 arg2 arg3'
)
op = BashOperator(
task_id='submit_spark_job',
bash_command=cmd,
dag=dag
)
spark-submit \
--name {{ app_name }} \
--jars {{ jars }} \
--class {{ main_class }} \
--master {{ spark_master }} \
{{ spark_submit_params }} \
/path/to/my_jar.jar \
{{ arguments }}
spark-submit \
--name my_app \
--jars /path/to/jars \
--class com.mycompany.MyClass \
--master local[*] \
--conf spark.driver.memory=2g \
/path/to/my_jar.jar \
arg1 arg2 arg3
上一篇:Airflow重试显示了错误的次数,尝试5次中的4次。
下一篇:Airflow中使用BigQueryToCloudStorageOperator时出现“ModuleNotFoundError:Nomodulenamed'httplib2'”错误。