spark.jars.packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0
这里的配置表示使用 Kafka 0.8 版本的 API,对应的 Spark 版本为 2.4.0。 3. 接下来需要编写 Spark Streaming 应用程序。以下是一段示例代码:
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext("local[2]", "KafkaExample") ssc = StreamingContext(sc, 5)
kafkaParams = {"metadata.broker.list": "localhost:9092"} topics = ["test"]
stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = stream.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start() ssc.awaitTermination()
代码中使用了 Spark Streaming 和 Kafka 的相关 API,首先将 Kafka 参数传入 Spark Streaming 的 createDirectStream 方法,然后使用 map、flatMap 和 reduceByKey 方法对消息进行处理,最后使用 pprint 方法将结果输出。