要解决Airflow在GCP Cloud Run作业完成后不反映其状态的问题,可以使用Cloud Run的Eventarc功能来触发状态更新。以下是一个示例的解决方法:
首先,确保你已经在GCP上设置了Airflow和Cloud Run的环境,并且已经启动了相关的作业。
在Cloud Run服务的部署文件中(例如service.yaml
),添加一个新的事件触发器(event-trigger):
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: my-cloud-run-service
spec:
template:
spec:
containers:
- image: my-cloud-run-image
env:
- name: AIRFLOW_DAG_RUN_ID
value: "your-dag-run-id" # 替换为你的DAG运行ID
# 添加事件触发器
serviceAccountName: cloud-run-eventarc-sa
triggers:
- kind: event
apiVersion: eventing.knative.dev/v1
name: my-cloud-run-service-trigger
filter:
attributes:
type: com.google.cloud.auditlog.event
serviceName: run.googleapis.com
methodName: google.cloud.run.v1.RevisionServingStateChanged
sink:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-cloud-run-service
在上面的示例中,我们添加了一个事件触发器my-cloud-run-service-trigger
,它会监听Cloud Run服务的状态更改事件google.cloud.run.v1.RevisionServingStateChanged
。当Cloud Run服务的状态发生变化时,它会触发指定的服务。
gcloud iam service-accounts create cloud-run-eventarc-sa --display-name "Cloud Run Eventarc Service Account"
gcloud run services add-iam-policy-binding my-cloud-run-service --member="serviceAccount:cloud-run-eventarc-sa@your-project-id.iam.gserviceaccount.com" --role="roles/run.eventReceiver"
请替换your-project-id
为你的项目ID,my-cloud-run-service
为你的Cloud Run服务名称。
gcloud services enable eventarc.googleapis.com
gcloud pubsub topics create my-cloud-run-topic
gcloud eventarc triggers create my-cloud-run-trigger \
--destination-run-service=my-cloud-run-service \
--destination-run-region=us-central1 \
--event-filters="type=google.cloud.auditlog.event,serviceName=run.googleapis.com,methodName=google.cloud.run.v1.RevisionServingStateChanged" \
--transport-topic=projects/your-project-id/topics/my-cloud-run-topic
请替换your-project-id
为你的项目ID和my-cloud-run-service
为你的Cloud Run服务名称。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks import GcpPubSubHook
def update_job_status():
pubsub_hook = GcpPubSubHook(gcp_conn_id='google_cloud_default')
messages = pubsub_hook.pull("my-cloud-run-subscription", max_messages=1)
if messages:
# 处理接收到的消息并更新作业状态
message = messages[0]
# 更新作业状态的逻辑
with DAG('my_dag', schedule_interval='@once') as dag:
task = PythonOperator(
task_id='update_job_status_task',
python_callable=update_job_status
)
在上面的示例中,我们使用Pub/Sub Hook来订阅名为my-cloud-run-subscription
的订阅。当接收到新的消息时,我们可以处理消息并更新作业的状态。
通过使用Cloud Run的Eventarc功能和Pub/Sub Hook,你可以在Cloud Run作业完成后及时地更新Airflow的作业状态。