要在本地主机上运行Apache Airflow的Celery工作者,您可以按照以下步骤进行操作:
安装依赖项:
pip install apache-airflow celery
创建一个名为dags的目录,并在其中创建一个名为hello_world.py的Python文件。在hello_world.py文件中,添加以下代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def hello_world():
print("Hello, World!")
with DAG(
dag_id='hello_world_dag',
schedule_interval=None,
start_date=datetime(2022, 1, 1),
) as dag:
task = PythonOperator(
task_id='hello_world_task',
python_callable=hello_world
)
创建一个名为celery_worker.py的Python文件,并添加以下代码:
from airflow import settings
from airflow.models import DAG
from airflow.utils.db import create_session
from airflow.executors.celery_executor import app as celery_app
from airflow.executors import celery_executor
def start_celery_worker():
app = celery_app.get_default_app()
celery_worker = celery_executor.CeleryExecutor(app=app)
celery_worker.start()
with create_session() as session:
start_celery_worker()
打开终端,导航到包含hello_world.py和celery_worker.py的目录,并运行以下命令来启动Airflow Web服务器:
airflow webserver
在新的终端中,导航到相同的目录,并运行以下命令以启动Celery工作者:
python celery_worker.py
打开浏览器,并访问Airflow Web服务器的URL(默认为http://localhost:8080)。在Web界面中,您应该看到名为hello_world_dag的DAG。
单击DAG的名称,然后单击“Trigger DAG”按钮以触发DAG的运行。您应该在终端中看到Hello, World!的输出。
这就是在本地主机上运行Apache Airflow的Celery工作者的解决方法。请注意,您可能需要根据您的环境进行适当的配置和调整。