Airflow中有两个选项来从存储区上传DAG到调度程序:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': ['alerts@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), }
dag = DAG( 'example_dag', default_args=default_args, description='Example DAG that runs from storage', schedule_interval=timedelta(days=1), catchup=False )
task_1 = BashOperator( task_id='task_1', bash_command='echo "hello world"', dag=dag )
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.models import Variable
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': ['alerts@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), }
dag = DAG( 'example_dag', default_args=default_args, description='Example DAG that runs from storage', schedule_interval=timedelta(days=1), catchup=False )
storage_path = Variable.get('storage_path')
dag.dag_dir_list.append(storage_path)
task_1 = BashOperator( task_id='task_1', bash_command='echo "hello world"', dag=dag )