final int batchSize = 500;
final int checkpointInterval = 5000;
final StreamingFileSink sink = StreamingFileSink.forBulkFormat(
new Path(outputPath),
AvroWriters.forReflectRecord(AvroPojo.class))
.withBucketAssigner(bucketAssigner)
.withBucketCheckInterval(60_000)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(1))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.withOutputFileConfig(outputFileConfig)
.withBatchSize(batchSize)
.withBucketCheckInterval(5)
.withCheckpointInterval(checkpointInterval)
.build();
设置了批处理大小batchSize和checkpoint周期checkpointInterval,通过限制批量处理和定期检查数据状态来提高性能。