BigQuery 数据传输触发器按需是一种在数据传输作业完成后自动触发其他操作的功能。下面是一个包含代码示例的解决方法:
from google.cloud import pubsub_v1
project_id = "your-project-id"
topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
topic = publisher.create_topic(request={"name": topic_path})
print("Topic created: {}".format(topic.name))
from google.cloud import bigquery
project_id = "your-project-id"
dataset_id = "your-dataset-id"
table_id = "your-table-id"
transfer_config_id = "your-transfer-config-id"
client = bigquery.Client()
transfer_config = {
"destination_dataset_id": dataset_id,
"display_name": "My Transfer Config",
"data_source_id": "your-data-source-id",
"params": {
"table": table_id
},
"schedule": "every 24 hours",
"notification_pubsub_topic": topic_path,
"notification_pubsub_topic_project_id": project_id
}
response = client.create_transfer_config(
project_id=project_id,
transfer_config=transfer_config,
parent=f"projects/{project_id}/locations/{location}"
)
print("Transfer config created: {}".format(response.name))
subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
subscription = subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
print("Subscription created: {}".format(subscription.name))
def process_message(message):
print("Received message: {}".format(message.data))
# 在这里执行您想要的操作
message.ack()
subscriber.subscribe(subscription_path, callback=process_message)
请注意,以上代码示例仅供参考,您需要根据实际情况进行修改和适配。此外,您还需要确保您的项目已经启用了 Pub/Sub 和 BigQuery 服务,并具有相应的访问权限。