以下是解决AWS MSK和Google Dataflow之间连接问题的步骤:
import apache_beam as beam
options = PipelineOptions()
# Set your Google Cloud project ID.
options.view_as(GoogleCloudOptions).project = 'YOUR_PROJECT_ID'
# Set temporary location for Dataflow job output.
options.view_as(GoogleCloudOptions).temp_location = 'gs://YOUR_BUCKET_NAME/tmp/'
# Set your AWS access key and secret.
options.view_as(AwsOptions).access_key_id = 'YOUR_AWS_ACCESS_KEY'
options.view_as(AwsOptions).secret_access_key = 'YOUR_AWS_SECRET_KEY'
# Read from Kafka using KafkaIO.
with beam.Pipeline(options=options) as p:
records = p | 'Read from Kafka' >> beam.io.ReadFromKafka(
consumer_config={
'bootstrap.servers': 'YOUR_BOOTSTRAP_SERVERS'
},
topics=['YOUR_KAFKA_TOPIC']
)
在这个示例中,您需要替换“YOUR_PROJECT_ID”、“YOUR_BUCKET_NAME”、“YOUR_AWS_ACCESS_KEY”、“YOUR_AWS_SECRET_KEY”、“YOUR_BOOTSTRAP_SERVERS”和“YOUR_KAFKA_TOPIC”为实际值。 您还需要安装apache_beam和kafka-python依赖项。