在使用Akka persistence Cassandra时,需要先初始化数据库才能进行后续的持久化操作。以下是数据库初始化相关的Scala代码示例:
import akka.persistence.cassandra._
// 在启动应用程序时进行数据库初始化 val journalConfig = ConfigFactory.parseString(""" akka.persistence.journal.plugin = "cassandra-journal" akka.persistence.journal.cassandra { # ... } """) CassandraJournalConfig.createFrom(journalConfig).sessionProvider.connect()
val snapshotConfig = ConfigFactory.parseString(""" akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" akka.persistence.snapshot-store.cassandra { # ... } """) CassandraSnapshotConfig.createFrom(snapshotConfig).sessionProvider.connect()
// 也可以在需要的时候手动初始化数据库 val journalSettings = CassandraJournalSettings(system) val snapshotSettings = CassandraSnapshotSettings(system)
val journalSessionProvider = new CassandraSessionProvider(system, journalSettings.sessionProviderConfig, journalSettings.sessionSettings) val snapshotSessionProvider = new CassandraSessionProvider(system, snapshotSettings.sessionProviderConfig, snapshotSettings.sessionSettings)
val journalSession = journalSessionProvider.connect() val snapshotSession = snapshotSessionProvider.connect()
journalSession.execute("CREATE KEYSPACE IF NOT EXISTS akka_journal WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") journalSession.execute("CREATE TABLE IF NOT EXISTS akka_journal.messages (persistence_id text, partition_nr bigint, sequence_nr bigint, marker text, timebucket bigint, timestamp timeuuid, writer_uuid text, ser_id int, ser_manifest text, event_manifest text, event_blob blob, PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, marker))")
snapshotSession.execute("CREATE KEYSPACE IF NOT EXISTS akka_snapshot WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") snapshotSession.execute("CREATE TABLE IF NOT EXISTS akka_snapshot.snapshots (persistence_id text, sequence_nr bigint, timestamp timeuuid, ser_id int, ser_manifest text, snapshot blob, PRIMARY KEY ((persistence_id), sequence_nr, timestamp))")