可以使用 AWS KCL (Kinesis Client Library) 库中的轮询程序来解决该问题。该程序会协调多个消费者,以确保每个 shard 中的每个数据记录仅被一个消费者读取。以下是使用 Java 编写的示例代码:
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
public class KCLMultiConsumerExample {
private static final String APPLICATION_NAME = "my-kinesis-app";
private static final String STREAM_NAME = "my-kinesis-stream";
private static final String REGION_NAME = "us-east-1";
public static void main(String[] args) {
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME)
.withRegionName(REGION_NAME)
.withWorkerIdentifier("worker-1")
.withMaxRecords(1000)
.withIdleTimeBetweenReadsInMillis(1000);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new YourRecordProcessorFactory())
.config(config)
.build();
worker.run();
}
}
该示例代码创建了一个名为“my-kinesis-app”、使用名为“my-kinesis-stream”的流和位于“us-east-1”区域的 KCL 配置。使用了名为“worker-1”的应用程序标识符、每次最多处理 1000 条记录,并且在读取之间空闲了 1000 毫秒。将自定义的记录处理器工厂传递给工人构建器,该工厂将创建一组处理记录的记录处理器。最后,该工人运行并处理数据记录。