Local Actor workflow part 1 - Sender side
Overview
You can find the code and instruction to run the example at GitHub.
This is about how Akka sends a message from one Actor to another locally (i.e. within the same JVM). As in the Serialization
part of the official doc:
The messages that Akka actors send to each other are JVM objects (e.g. instances of Scala case classes). Message passing between actors that live on the same JVM is straightforward. It is simply done via reference passing.
If you are interested in the remote behavior when sending messages to remote JVM, I will write remote versions of articles soon so please refer to them.
Workflow
MessageSender
sends messages "Hello World", "Hello Universe" and "Hello Galaxy" to the messageReceiver
actor.
class MessageSender(messageReceiver: ActorRef)
extends Actor {
override def preStart(): Unit = {
val messages = List(
"Hello World",
"Hello Universe",
"Hello Galaxy"
)
for(msg <- messages) {
messageReceiver ! msg
}
}
....
}
The preStart
method is a lifecycle method provided by Akka Actor.
The below is what's inside the main method, which initializes the receiver and the sender.
val system = ActorSystem("exampleSystem")
val receiver = system.actorOf(
Props[MessageReceiver],
"receiver"
)
// sender
system.actorOf(
MessageSender.props(receiver),
"sender"
)
As in the preStart
method of MessageSender
, the first message to be sent is,
messageReceiver ! "Hello World",
where the !
method is a method of LocalActorRef
.
override def !(message: Any)
(implicit sender: ActorRef = Actor.noSender): Unit =
actorCell.sendMessage(message, sender)
Above actorCell
is an instance of ActorCell
, which implements Cell
trait, and the Cell
trait has the following sendMessage
.
final def sendMessage(
message: Any,
sender: ActorRef
): Unit =
sendMessage(Envelope(message, sender, system))
Here you see an Envelope
which encapsulates message
and sender
case class Envelope(
val message: Any,
val sender: ActorRef
)
To illustrate the workflow so far up to the sendMessage
:
The sendMessage
method of the Cell
trait (ActorCell
) calls the sendMessage
method of the Dispatch
trait.
def sendMessage(msg: Envelope): Unit =
try {
...
dispatcher.dispatch(this, msg)
}
(The concept of Dispatcher
might be unfamiliar to you and it is probably difficult to understand. I will write another article later to illustrate Dispatcher
in more detail, but for now, you can assume dispatcher is, as the meaning of the word says, "sender" of messages.)
There are two distinct Dispatch
and Dispatcher
traits in akka. The Dispatch
trait has a reference to dispatch: Dispatcher
and Dispatcher
's dispatch method is as follows:
def dispatch(
receiver: ActorCell,
invocation: Envelope
): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
Mailbox
has the following enqueue
method
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
The messageQueue
is type of MessageQueue
class which is defined for each different Mailbox
type. The one for the default UnboundedMailbox
is:
object UnboundedMailbox {
class MessageQueue
extends ConcurrentLinkedQueue[Envelope]
with UnboundedQueueBasedMessageQueue {
final def queue: Queue[Envelope] = this
}
}
Coming back to the dispatch
method of Dispatcher
, it has registerForExecution as follows:
def registerForExecution(mbox: Mailbox, ...): Boolean = {
...
executorService execute mbox
...
}
Since Mailbox
is defined as ForkJoinTask
, the execution (i.e. processing) of Mailbox
is run on a different Thread
, which will be covered in the next article
abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit]
with SystemMessageQueue
with Runnable {
...
}
In the next article, Local Actor workflow part 2 - Receiver side, I will discuss about what happens on the receiver side which is triggered as the above ForkJoinTask
.
Instruction to run the example
> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd local-minimal
> sbt
> runMain example.Main
Output
Some println
calls are inserted in the complete example at GitHub to illustrate the behavior.
Thread names are shown as [exampleSystem-akka.actor.default-dispatcher-3] and [...-4].
[info] Running example.Main
provider = local
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello World to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Universe to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello World
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Galaxy to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Universe
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Galaxy
[success] Total time: 7 s, completed Jan 30, 2018 6:16:46 AM
References
- Official documentation of Akka Actor at https://doc.akka.io/docs/akka/2.5/actors.html
- Official documentation of Akka Dispatcher at https://doc.akka.io/docs/akka/2.5/dispatchers.html
- Official documentation of Akka lifecycle at https://doc.akka.io/docs/akka/current/actors.html$actor-lifecycle
- Official documentation of Akka Mailbox at https://doc.akka.io/docs/akka/2.5/mailboxes.html?language=scala#mailboxes)
- Official documentation of Akka location transparency at https://doc.akka.io/docs/akka/current/general/remoting.html#location-transparency
- Oracle's documentation about Fork/Join at https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
- ExecutorService Javadoc at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html