首先,确认Kafka是否已启用SASL_SSL认证和OAUTHBEARER授权模式。在Apache Beam中,可以通过以下方法配置:
from apache_beam.options.pipeline_options import DebugOptions
options = DebugOptions.from_dictionary({
"debug_options": "{'enable_service_account_auth': True}"
})
接下来,创建Kafka消费者时,需要设置SASL_OAUTHBEARER认证机制:
from kafka import KafkaConsumer
from kafka.errors import KafkaError
topic = 'my-topic'
brokers = ['kafka-1:9094', 'kafka-2:9094']
consumer = KafkaConsumer(
topic,
bootstrap_servers=brokers,
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
ssl_check_hostname=False,
api_version=(0, 10),
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
注意,要根据需要更新参数值,例如,更改群组ID,更改反序列化器函数等。
如果仍然遇到问题,则可能需要进一步配置Kafka服务器的安全设置,以确保正确的身份验证和授权。