在 Akka Streams 中,当使用 RestartSink 时,可能会遇到某些故障(例如网络错误)导致流崩溃,但是 RestartSink 并不会重新启动流。这可能是因为您没有正确设置 RestartSink 的参数或正在使用错误的 SupervisorStrategy。为了解决这个问题,可以按照以下步骤进行操作:
val restartSink = RestartSink.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
)(() => mySink)
val system = ActorSystem("my-system")
val decider: Supervision.Decider = {
case _: Exception => Supervision.Restart
case _ => Supervision.Stop
}
implicit val materializerSettings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(materializerSettings.withSupervisionStrategy(decider))
val testFlow = Flow[String].map(_ => throw new Exception("Test Exception"))
Source.single("test").via(restartSink).runWith(TestSink.probe)
.expectSubscription()
.expectNoMessage(4.seconds) // 确认超时时间为 4 秒
.cancel()
通过以上步骤设置和测试,您应该能够使用 RestartSink 在 Akka Streams 中正确地重新启动失败的流。