Airflow是一个用于编排和调度任务的开源平台。下面是一个示例,展示如何使用Airflow调度月度作业的代码:
首先,确保你已经安装了Airflow,并启动了Airflow的Web服务器和调度器。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# 定义一个Python函数,用于执行月度作业的逻辑
def monthly_job():
# 在这里编写你的月度作业的代码逻辑
print("Running monthly job...")
# 定义DAG
dag = DAG("monthly_job_dag",
description="Monthly Job DAG",
schedule_interval="0 0 1 * *", # 在每个月的第一天运行作业
start_date=datetime(2022, 1, 1), # 开始日期
catchup=False)
# 定义任务
start_task = DummyOperator(task_id="start_task", dag=dag)
monthly_job_task = PythonOperator(task_id="monthly_job_task",
python_callable=monthly_job,
dag=dag)
end_task = DummyOperator(task_id="end_task", dag=dag)
# 设置任务的依赖关系
start_task >> monthly_job_task >> end_task
将以上代码保存为一个Python文件(例如monthly_job_dag.py),然后将该文件放在Airflow的DAG目录中(默认情况下是~/airflow/dags/)。
使用以下命令启动调度器:
airflow scheduler
airflow webserver
现在,你可以在Airflow的Web界面中看到你的DAG(http://localhost:8080 by default)。
在Web界面中,找到你的DAG并手动启动它。你的月度作业将在每个月的第一天运行。
这只是一个简单的示例,你可以根据你的具体需求进行更复杂的调度配置和作业逻辑。