该错误通常是由于在 S3 路径中指定的“bucket”或“key”存在错误或未设置所导致的。将正确的 S3 bucket 和 key 传递给输出操作符即可解决此问题。以下是示例代码:
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
StreamingFileSink s3Sink = StreamingFileSink
.forBulkFormat(new Path(s3OutputPath), ParquetAvroWriters.forReflectRecord(MyRecord.class))
.withBucketAssigner(new DateTimeBucketAssigner(s3OutputPath + "/${yyyy-MM-dd}/", "UTC"))
.withBucketCheckInterval(1 * 60 * 1000L)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(256 * 1024 * 1024)
.withRolloverInterval(60 * 60 * 1000L)
.withInactivityInterval(10 * 60 * 1000L)
.build())
.withOutputFileConfig(OutputFileConfig
.builder()
.withPartPrefix("data")
.withPartSuffix(".snappy.parquet")
.build())
.build();
stream.addSink(s3Sink)
.setParallelism(1)
.name("s3Sink");
在上面的示例代码中,正确的 S3 bucket 和 key 已经传递给了 BucketAssigner:“DateTimeBucketAssigner