使用PythonOperator自定义函数来处理不支持的模板字段。
示例代码:
from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2022, 1, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
def spark_submit_job(): # 构建自己的代码逻辑 # 使用非标准模板字段和模板变量等变量 spark_op = SparkSubmitOperator( task_id='spark_submit', application='{{ dag_run.conf["application"] }}', name='MySparkJob', conn_id='spark_default', java_class='com.foo.bar.MySparkJob', executor_memory='{{ dag_run.conf.executor_memory }}', driver_memory='{{ dag_run.conf.driver_memory }}', application_args=['{{ ds }}'], conf={'spark.yarn.archive': '{{ conf.job.spark_archive }}'}, ) print('Spark Submit Operator:', spark_op)
dag = DAG( 'spark_submit_job', default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, )
run_spark = PythonOperator( task_id='run_spark', python_callable=spark_submit_job, dag=dag, )
run_spark