这可能是因为重新启动后没有正确配置Flink检查点设置。
检查在flink-conf.yml
文件中的以下设置:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
确保state.backend
设置为rocksdb
并且state.checkpoints.dir
指向正确的检查点目录。
如果仍然存在问题,可以尝试手动从最近的检查点恢复。
示例代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
if (restoreFromCheckpoint) {
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
// restore from the latest checkpoint
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
)
);
}
...
DataStream<...> stream = ...;
stream.addSink(...);
env.execute("MyStreamingJob");