Airflow 中的 worker 负责从调度器中获取任务并执行它们。通过调用 Airflow 的执行程序,worker 可以在容器上启动它们。Worker 进程使用 Celery 执行器从任务队列中获取任务,然后运行它们。
下面是一个简单的 Airflow 任务示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag = DAG(
'my_dag',
start_date=datetime(2021, 1, 1),
schedule_interval='@once',
)
task1 = BashOperator(
task_id='my_task_1',
bash_command='echo "Hello World!"',
dag=dag,
)
在这个例子中,我们创建了一个名为 my_dag
的 DAG,其中包含一个 Bash 运算符类型的任务,任务 ID 是 my_task_1
,Bash 命令是 echo "Hello World!"
。
Worker 是由 Airflow 的调度器自动启动的,不需要手动启动,因为它们会自动检测 DAG。但是,当 worker 消息队列中有大量待处理的任务时,worker 可能会出现延迟和阻塞的情况。为了解决这个问题,可以通过以下方式来优化 worker:
--pool=prefork
或 --concurrency=n
)来提高任务执行效率。通过实施上述优化措施,可以使 Airflow worker 的执行更加高效和可靠。