PersistentActor minimal example with akka-persistence-sql-async
Overview
You can find the code and instruction to run the example at GitHub. There is also an official sample available.
Refer to another post, Persistence Actor Minimal example for basics of PersistentActor.
In this example, the target DB is MySQL. Firstly you need to create a database akka_persistence_sql_async
and execute mysql.sql
so that the database has necessary tables.
In MyPersistentAcdtor, when persist
method is called, an Event
is sent to ScalikeJDBCWriteJournal.
override def receiveCommand: Receive = {
case Command(i) ⇒
persist(Event(i)) {
...
}
}
Then ScalikeJDBCWriteJournal serializes Event
to Array[Byte]
with Akka serializer.
After that, ScalikeJDBCWriteJournal
prepares an SQL statement to persist the data to an SQL database, including the message
column to hold the binary of Event
.
Instruction to run the example
> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd persistent-actor-minimal-sql
> sbt
> runMain example.Main
Output
[info] Running example.Main
receiveCommand : Received Command Command(1)
persist callback: Event = Event(1) persisted
persist callback: current state = 1
receiveCommand : Received Command Command(2)
persist callback: Event = Event(2) persisted
persist callback: current state = 3
receiveCommand : Received Command Command(3)
persist callback: Event = Event(3) persisted
persist callback: current state = 6
[ERROR] [01/13/2018 17:24:19.422] [exampleSystem-akka.actor.default-dispatcher-7] [akka://exampleSystem/user/p1] exploded!
java.lang.Exception: exploded!
at example.MyPersistentActor$$anonfun$receiveCommand$1.applyOrElse(Main.scala:37)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at example.MyPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(Main.scala:11)
at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:663)
at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:183)
at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:182)
at example.MyPersistentActor.aroundReceive(Main.scala:11)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
receiveRecover : Recovering an event = Event(1)
receiveRecover : current state = 1
receiveRecover : Recovering an event = Event(2)
receiveRecover : current state = 3
receiveRecover : Recovering an event = Event(3)
receiveRecover : current state = 6
receiveCommand : Received Command Command(4)
persist callback: Event = Event(4) persisted
persist callback: current state = 10
receiveCommand : Received Command Command(5)
persist callback: Event = Event(5) persisted
persist callback: current state = 15
[success] Total time: 2 s, completed Jan 13, 2018 5:24:20 PM
References
- Official persistence documentation at https://doc.akka.io/docs/akka/2.5/persistence.html
- Official Akka serialization documentation at https://doc.akka.io/docs/akka/2.5/serialization.html
- akka-persistence-sql-async plugin at https://github.com/okumin/akka-persistence-sql-async