要解决"Airflow 后台任务"的问题,需要以下步骤:
pip install apache-airflow
airflow initdb
创建一个 DAG(有向无环图):在 Airflow 的 DAG 文件夹中创建一个 Python 文件,例如 my_dag.py
。
在 DAG 文件中定义任务:在 my_dag.py
文件中,使用 Python 代码定义需要在后台运行的任务。以下是一个示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_task():
# 这里是后台任务的代码逻辑
print("Running my task")
dag = DAG('my_dag', description='A simple DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False)
task = PythonOperator(task_id='my_task', python_callable=my_task, dag=dag)
在上面的代码中,我们定义了一个名为 my_task
的 Python 函数,并使用 PythonOperator
创建一个任务,将 my_task
函数作为参数传递给 python_callable
。我们还定义了 DAG 的调度间隔、开始日期和是否追溯等属性。
airflow webserver -p 8080
airflow scheduler
http://localhost:8080
,可以看到 Airflow 的 Web UI。在 UI 中,可以查看任务的运行状态、日志和时间表等信息。以上是一个简单的示例,你可以根据自己的需求定义更复杂的 DAG 和任务。