为了评估 Apache Kafka Producer 的吞吐量和延迟,可以使用以下代码示例:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerThroughputLatency {
private static final Random random = new Random();
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置 batch.size 和 linger.ms 来优化吞吐量和延迟
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
Producer producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
long start = System.currentTimeMillis();
try {
for (int i = 0; i < 1000000; i++) {
String value = "test-message-" + i;
ProducerRecord record = new ProducerRecord<>("test-topic", value);
Future result = producer.send(record);
result.get(); // 等待返回结果,以便计算延迟
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}
long end = System.currentTimeMillis();
double elapsedSeconds = (end - start) / 1000.0;
double throughput = 1000000 / elapsedSeconds;
System.out.println("Throughput: " + throughput + " messages/second");
}
}
上述代码向名为 "test-topic" 的主题