一种可能的解决方法是在 Apache Beam 中使用 KafkaPython 库,并配置正确的安全协议和认证机制。下面是一个代码示例:
import apache_beam as beam
from apache_beam.transforms import window
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
class KafkaCustomSink(beam.PTransform):
"""A custom Apache Beam sink for producing messages to Apache Kafka"""
def __init__(self, bootstrap_servers, topic, security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=pks_utils.oauth2_provider_from_environment()):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.security_protocol = security_protocol
self.sasl_mechanism = sasl_mechanism
self.sasl_oauth_token_provider = sasl_oauth_token_provider
# In this transform, the elements of the PCollection are simply
# sent as messages to the Kafka topic.
def expand(self, pcoll):
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
security_protocol=self.security_protocol,
sasl_mechanism=self.sasl_mechanism,
sasl_oauth_token_provider=self.sasl_oauth_token_provider)
def send_to_kafka(element):
future = producer.send(self.topic, element)
try:
record_metadata = future.get(timeout=10)
except KafkaError as ex:
# Decide what to do if produce request failed...
print('Error producing to topic {0}: {1}'.format(self.topic, str(ex)))
except:
# Decide what to do if produce request failed...
print('Error producing to topic {0}: unexpected error'.format(self.topic))
else:
# Print metadata of the produced message for reference...
print('Message produced with metadata: {0}'.format(str(record_metadata)))
pcoll | 'Kafka send' >> beam.Map(send_to_kafka)
return pcoll
# Example usage of the KafkaCustomSink custom sink...
with beam.Pipeline() as pipeline:
data = ['{"value": "ABCD"}', '{"value": "EFGH"}', '{"value": "IJKL"}']
(pipeline
| "Create data" >> beam.Create(data)
| "Write to Kafka" >> KafkaCustomSink(bootstrap_servers='kafka-brokers', topic='test-topic')
)
在代码示例中,我们使用 KafkaCustomSink 类作为 Apache Beam 的自定义 sink,将 PCollections 中的消息发送到 Apache Kafka。在初始化 KafkaProducer 时,我们设置