libraryDependencies += "com.typesafe.akka" %% "akka-persistence-jdbc" % "5.0.0"
akka.projection.jdbc {
# JDBC driver class to use
driver = "org.postgresql.Driver"
# Database url
url = "jdbc:postgresql://localhost:5432/dbname"
# Database user/password
user = ""
password = ""
# Connection pool parameters
connectionPool {
# Maximum number of connections to allocate
maxSize = 5
# Maximum time to wait for a free connection from the pool
maxQueueSize = 10
# Maximum time to wait when connection pool is full and no connection is available
connectionTimeout = 10s
}
}
import slick.jdbc.PostgresProfile.api._
val db = Database.forConfig("akka.projection.jdbc")
val system = ActorSystem("my-project", config)
// 或者
val system = ActorSystem("my-project", config.withFallback(Configuration.fromDb(db)))
import akka.stream.scaladsl.Source
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.EventEnvelope
import akka.projection.eventsourced.EventEnvelopeProjection
import akka.projection.jdbc.scaladsl.JdbcHandler
case class MyProjectionHandler(db: Database) extends JdbcHandler[EventEnvelope[MyEvent]] {
override def process(session: Session, envelope: EventEnvelope[MyEvent]): Unit = {
// 处理事件并将其写入数据库
}
}
val projection = EventEnvelopeProjection
.atLeastOnceSource(() => Source.from(myEventSource), projectionId)
.withProjectionBehavior(ProjectionBehavior.fromFunctionHandler(MyProjectionHandler(db)))
上一篇:AkkaPlay测试中的@NamedCache注入问题
下一篇:Akka轻量级线程