1.1 在配置文件 server.properties 中,设置以下参数:
- broker.id:每个 Kafka Broker 的唯一标识
- listeners:Broker 的监听地址,建议同时配置 SSL。
- log.dirs:存储 Kafka 消息文件的目录。
- replica.fetch.max.bytes:每个分区的最大拉取消息的字节数,建议默认值 1048576。
- socket.send.buffer.bytes 和 socket.receive.buffer.bytes:设置发送和接收缓冲区大小的参数。
1.2 配置 Kafka Broker 的日志清理策略,建议使用 log.cleanup.policy=delete。
2.1 生产者的序列化和反序列化格式需与 Broker 的一致,推荐使用 JSON 编码方式。
2.2 在创建生产者时,应设置以下参数:
- bootstrap.servers:Kafka 集群中至少一个 Broker 的地址。
- retries:自动重试次数。
- acks:消息的确认机制,建议使用 all。
- buffer.memory、batch.size 和 linger.ms:控制批量发送消息的大小和时间间隔。
2.3 在发送消息时,应按分区进行分批批处理,避免大量消息同时发送给单个分区,导致磁盘 IO 瓶颈。
3.1 消费者的反序列化格式需与 Producer 的一致,推荐使用 JSON 编码方式。
3.2 在创建消费者时,应设置以下参数:
- bootstrap.servers:Kafka 集群中至少一个 Broker 的地址。
- group.id:消费者所属的消费组 id。
- enable.auto.commit:自动提交偏移量。
- auto.offset.reset:offset 不存在或超过了有效期时的处理方式,建议设置为 earliest。
- max.poll.records:每次 poll 最大拉取