val spark = SparkSession
.builder
.appName("StructuredKafkaOAuth")
.config("spark.kafka.oauth.client.id", "")
.config("spark.kafka.oauth.client.secret", "")
.config("spark.kafka.oauth.tokenEndpoint", "")
.config("spark.kafka.oauth.username", "")
.config("spark.kafka.oauth.password", "")
.master("local[*]")
.getOrCreate()
这里的
和
分别是在Kafka中创建的客户端ID和客户端密码,
是OAuth2授权服务器的URL,
和
分别是授权服务器的用户名和密码。
option
方法传递认证参数。例如:val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OAUTHBEARER")
.option("kafka.ssl.truststore.location", "")
.option("kafka.sasl.login.callback.handler.class", "io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler")
.option("kafka.sasl.jaas.config", "tokenUser={USERNAME};token={PASSWORD};")
.option("subscribe", "")
.load()
这里的
是Kafka集群的引导服务器列表,
是SSL证书的位置,
是要订阅的Kafka主题。注意,kafka.security.protocol
和`