首先,确保您的主题已启用紧缩。要检查这一点,请使用以下命令:
bin/kafka-topics.sh --describe --topic my-compacted-topic --zookeeper localhost:2181
您应该看到“cleanup.policy=compact”的输出。如果没有,请将主题配置更改为:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-compacted-topic --add-config cleanup.policy=compact
确保您的生产者代码正确设置了密钥,并且每个消息都有唯一的密钥。紧缩主题根据密钥来进行数据合并和清理。如果您的消息没有密钥,则它们将被视为独立的实体,而不是需要合并的位置。
验证消费者代码是否设置为“读取最新消息”。如果消费者从过去的偏移量读取消息,则合并的消息可能会被跳过,并且顺序可能不正确。请确保您的消费者代码包含以下内容:
properties.put("auto.offset.reset", "latest");
最后,请确认是否有多个生产者同时发送到该主题。在此情况下,因为每个生产者都有自己的存储计数器,因此数据 order 可能会变得混乱。如果可能,请避免这种情况,或者维护一个确定的写入顺序以确保正确的顺序。
经过上述步骤,请再次测试您的代码并确保它们按预期工作。