要按主题过滤Kafka流,可以使用Kafka Consumer API中的subscribe()方法来订阅特定的主题,然后使用Kafka Stream API中的filter()方法根据主题进行过滤。以下是一个示例代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaTopicFilter {
    public static void main(String[] args) {
        // Kafka consumer configuration
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Create Kafka consumer
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // Subscribe to a specific topic
        consumer.subscribe(Collections.singletonList("topic1"));
        // Start consuming messages
        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                // Filter messages based on topic
                if (record.topic().equals("topic1")) {
                    System.out.println("Received message: " + record.value());
                }
            }
        }
    }
}
   
上面的代码创建了一个Kafka消费者,订阅了名为"topic1"的主题,并使用poll()方法从Kafka集群中获取消息。然后,使用if语句来过滤只处理属于"topic1"主题的消息。你可以根据需要修改这段代码以适应你的实际情况,并添加适当的主题过滤逻辑。