在Airflow中,可以使用@task
装饰器定义一个任务,并使用schedule_interval
参数来设置不同的调度间隔。以下是一个示例代码:
from airflow.decorators import task
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_dag', default_args=default_args, schedule_interval=timedelta(days=1))
@task
def my_task():
# 任务逻辑
print("执行任务")
task1 = PythonOperator(task_id='my_task', python_callable=my_task, dag=dag)
在上面的代码中,我们定义了一个名为my_task
的任务,并使用@task
装饰器将其标记为一个可调度的任务。然后,我们使用PythonOperator
操作符来创建一个任务实例task1
,并将my_task
作为其python_callable
参数传递。最后,我们将任务实例添加到DAG中。
在这个示例中,我们将任务的调度间隔设置为每天执行一次,即schedule_interval=timedelta(days=1)
。你可以根据需求设置不同的调度间隔,例如每小时、每周等。
请注意,以上代码仅为示例,实际使用时需要根据具体需求进行调整。