要按主题过滤Kafka流,可以使用Kafka Consumer API中的subscribe()方法来订阅特定的主题,然后使用Kafka Stream API中的filter()方法根据主题进行过滤。以下是一个示例代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaTopicFilter {
public static void main(String[] args) {
// Kafka consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create Kafka consumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// Subscribe to a specific topic
consumer.subscribe(Collections.singletonList("topic1"));
// Start consuming messages
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// Filter messages based on topic
if (record.topic().equals("topic1")) {
System.out.println("Received message: " + record.value());
}
}
}
}
}
上面的代码创建了一个Kafka消费者,订阅了名为"topic1"的主题,并使用poll()方法从Kafka集群中获取消息。然后,使用if语句来过滤只处理属于"topic1"主题的消息。你可以根据需要修改这段代码以适应你的实际情况,并添加适当的主题过滤逻辑。