在 Akka Streams 中,当使用 RestartSink 时,可能会遇到某些故障(例如网络错误)导致流崩溃,但是 RestartSink 并不会重新启动流。这可能是因为您没有正确设置 RestartSink 的参数或正在使用错误的 SupervisorStrategy。为了解决这个问题,可以按照以下步骤进行操作:
val restartSink = RestartSink.withBackoff(
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2
)(() => mySink)
val system = ActorSystem("my-system")
val decider: Supervision.Decider = {
  case _: Exception => Supervision.Restart
  case _ => Supervision.Stop
}
implicit val materializerSettings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(materializerSettings.withSupervisionStrategy(decider))
val testFlow = Flow[String].map(_ => throw new Exception("Test Exception"))
Source.single("test").via(restartSink).runWith(TestSink.probe)
  .expectSubscription()
  .expectNoMessage(4.seconds)  // 确认超时时间为 4 秒
  .cancel()
通过以上步骤设置和测试,您应该能够使用 RestartSink 在 Akka Streams 中正确地重新启动失败的流。