如下是通过Akka集群来关闭所有节点和SBR角色的代码示例:
class NodeStateActor extends Actor {
private var nodes = Set.empty[Address]
private var sbr: Option[ActorRef] = None
context.system.eventStream.subscribe(self, classOf[MemberEvent])
override def receive: Receive = {
case MemberUp(member) => nodes = nodes + member.address
case MemberExited(member) => nodes = nodes - member.address
case MemberRemoved(member, _) => nodes = nodes - member.address
case MemberDowned(member) => nodes = nodes - member.address
case CurrentClusterState(state) => nodes = state.members.map(_.address).filter(_.hasLocalScope)
case sbr: ActorRef => this.sbr = Some(sbr)
case StateCheck => checkState()
}
private def checkState(): Unit = {
if (sbr.exists(!_.isTerminated) && nodes.nonEmpty) {
val potentials = nodes.filter(!_.hasLocalScope)
if (potentials.nonEmpty) {
potentials.foreach(context.actorSelection(_).tell(PoisonPill, Actor.noSender))
sbr.foreach(_.tell(PoisonPill, Actor.noSender))
} else if (!nodes.exists(!_.hasLocalScope)) {
sbr.foreach(_.tell(PoisonPill, Actor.noSender))
}
}
}
}
在应用程序中创建一个ActorSystem,同时启用Akka集群,并将Actor部署到所有节点上。
object Test extends App {
val config = ConfigFactory.parseString("""
akka {
actor.provider = "cluster"
remote.netty.tcp.port = 0
cluster.roles=["sbr"]
}
""")
val system = ActorSystem("test", config)
val nodeStateActor = system.actor
上一篇:Akka集群中无法接收到消息
下一篇:akka集群:配置种子节点