在Akka Streams中为mapAsync
定义超时可以使用akka.pattern.after
方法和Future
的recoverWith
方法来实现。下面是一个示例解决方案:
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object MapAsyncTimeoutExample extends App {
implicit val system: ActorSystem = ActorSystem("MapAsyncTimeoutExample")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
def mapAsyncWithTimeout[T, U](parallelism: Int, timeout: FiniteDuration)(f: T => Future[U]): Flow[T, U, NotUsed] = {
Flow[T].mapAsync(parallelism) { elem =>
val result = f(elem)
Future.firstCompletedOf(Seq(result, akka.pattern.after(timeout, system.scheduler)(Future.failed(new TimeoutException))))
}.collect { case Success(value) => value }
}
// 示例使用一个简单的异步函数
def asyncFunction(input: Int): Future[Int] = {
// 模拟耗时操作
Thread.sleep(1000)
Future.successful(input * 2)
}
val source = Source(1 to 10)
val parallelism = 4
val timeout = 500.millis
val result = source.via(mapAsyncWithTimeout(parallelism, timeout)(asyncFunction))
.runWith(Sink.foreach(println))
result.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed with exception: $ex")
}
// 等待Stream完成
result.onComplete(_ => system.terminate())
}
上述代码中,我们定义了一个名为mapAsyncWithTimeout
的自定义流操作符,它接受一个并行度参数和超时时间参数,并在每个元素上执行异步函数。在异步函数执行时,我们使用Future.firstCompletedOf
方法来等待结果或超时。如果超时发生,我们会返回一个失败的Future
,并使用collect
操作符过滤掉失败的结果。
在示例中,我们将1到10的整数源通过mapAsyncWithTimeout
操作符传递给一个异步函数asyncFunction
,并使用并行度为4和超时时间为500毫秒。最后,我们通过runWith
方法将结果打印到控制台,并在流完成时输出相应的消息。
请注意,mapAsyncWithTimeout
方法返回的是一个Flow
,您可以在Akka Streams中的任何地方使用它。