使用超时机制来避免单个节点上的请求超时。
示例代码:
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.pattern.after
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.control.NonFatal
class MyHttpClient(implicit ec: ExecutionContext) {
implicit val timeout: Timeout = Timeout(3 seconds)
private val http = Http()
def singleRequest(request: HttpRequest): Future[HttpResponse] = {
val promise = http.singleRequest(request)
promise.flatMap { response =>
if (response.status.isSuccess()) {
Future.successful(response)
} else {
handleNonSuccessResponse(request, response).recoverWith {
case NonFatal(e) => Future.failed(e)
}
}
} recoverWith {
case NonFatal(e) => handleRequestFailure(request, e)
} withTimeout()
}
private def withTimeout[T](duration: FiniteDuration = timeout.duration): PartialFunction[Future[T], Future[T]] = {
case future: Future[T] =>
Future.firstCompletedOf(List(future, after(duration, http.singleRequest(HttpRequest(uri = "/timeout", method = HttpMethods.GET)))).map {
future => future
})
}
private def handleNonSuccessResponse(request: HttpRequest, response: HttpResponse): Future[HttpResponse] = {
Future.failed(new RuntimeException(s"request: $request, response: $response"))
}
private def handleRequestFailure(request: HttpRequest, e: Throwable) = {
Future.failed(new RuntimeException(s"request: $request", e))
}
}