以下是一个示例代码,展示了如何在Akka流程中使用计时图、阶段逻辑和监督。
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.{ask, pipe}
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
import akka.stream.{ActorMaterializer, Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
// 定义消息
case class Request(payload: String)
case class Response(payload: String)
case class TimedRequest(request: Request, startTime: Long)
case class TimedResponse(response: Response, startTime: Long, endTime: Long)
// 处理请求的Actor
class Worker extends Actor {
def receive: Receive = {
case Request(payload) =>
// 模拟处理请求的耗时操作
Thread.sleep(1000)
val response = Response(s"Processed: $payload")
sender() ! response
}
}
// 监督Actor
class Supervisor(worker: ActorRef) extends Actor {
import context.dispatcher
// 创建一个路由器并将工作线程添加到路由器中
val workerRouter = {
val routees = Vector.fill(5) {
val r = context.actorOf(Props[Worker])
context.watch(r)
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}
def receive: Receive = {
case request: Request =>
// 创建一个计时器,将请求和开始时间传递给工作线程
val startTime = System.currentTimeMillis()
val timedRequest = TimedRequest(request, startTime)
workerRouter.route(timedRequest, sender())
case TimedResponse(response, startTime, endTime) =>
// 计算处理时间并将结果发送给原始请求者
val processingTime = endTime - startTime
val result = s"$response (Processing Time: $processingTime ms)"
sender() ! result
}
}
// 自定义计时图阶段
class TimingStage extends GraphStage[FlowShape[TimedRequest, TimedResponse]] {
val in = Inlet[TimedRequest]("TimingStage.in")
val out = Outlet[TimedResponse]("TimingStage.out")
override def shape: FlowShape[TimedRequest, TimedResponse] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val timedRequest = grab(in)
// 将当前时间添加到请求中
val startTime = System.currentTimeMillis()
val timedRequestWithStartTime = timedRequest.copy(startTime = startTime)
// 推送处理后的请求
push(out, timedRequestWithStartTime)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// 向下游请求一个元素
pull(in)
}
})
override def preStart(): Unit = {
// 向上游请求一个元素
pull(in)
}
}
}
object AkkaStreamSupervisionExample extends App {
implicit val system: ActorSystem = ActorSystem("AkkaStreamSupervisionExample")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val timeout: Timeout = Timeout(5.seconds)
// 创建监督Actor
val supervisor = system.actorOf(Props[Supervisor])
// 创建Akka流程
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// 创建一个计时图阶段
val timingStage = builder.add(new TimingStage)
// 创建一个Actor流程,并将流程的输出发送给监督Actor
val workerFlow = Flow[TimedRequest].mapAsync(1)(request => (supervisor ? request).mapTo[String])
val workerSink = Sink.foreach[String](println)
// 连接计时图阶段和工作Actor流程
timingStage ~> workerFlow ~> workerSink
// 返回流程的形状
FlowShape(timingStage.in, workerSink.in)