可以使用 Cloud Pub/Sub 作为 Airflow CeleryExecutor 中的 Celery Broker。
pip install apache-airflow[gcp]
pip install google-cloud-pubsub
pip install celery[redis]
# airflow.cfg
[core]
executor = CeleryExecutor
celery_config_options = {"broker_url": "rediss://:", 'worker_concurrency': 16}
[celery_broker_transport_options]
visibility_timeout = 3600
[celery_result_backend]
backend = "db+postgresql://airflow_user:airflow_user_password@:/airflow"
其中,broker_url 需要设置为 redis 或 gcp_pubsub 之一,因为 Celery 需要消息传递服务来确保任务的可靠传递。在这里,我们采用使用 Cloud Pub/Sub 作为 Celery Broker,因此需要在配置文件中添加以下内容:
from google.auth import compute_engine
from google.cloud import pubsub_v1
def gcp_pubsub_broker_url(project_id, topic_name):
credentials = compute_engine.Credentials()
broker = f"pubsub://{project_id}?topic={topic_name}"
return broker
[celery_broker_url]
broker_url = {{conf.get('core', 'gcp_pubsub_broker_url')}}?serializer=json&compression=gzip
gcp_pubsub_broker_url = {{gcp_pubsub_broker_url('your-project-id', 'your-topic-name')}}
gcloud pubsub topics create my-topic
airflow webserver -p 8080
airflow worker
然后即可将任务指派给 worker 去运行啦!