Akka, Concurrency, etc.

Local Actor workflow part 1 - Sender side

Overview

You can find the code and instruction to run the example at GitHub.

This is about how Akka sends a message from one Actor to another locally (i.e. within the same JVM). As in the Serialization part of the official doc:

The messages that Akka actors send to each other are JVM objects (e.g. instances of Scala case classes). Message passing between actors that live on the same JVM is straightforward. It is simply done via reference passing.

If you are interested in the remote behavior when sending messages to remote JVM, I will write remote versions of articles soon so please refer to them.

Workflow

MessageSender sends messages "Hello World", "Hello Universe" and "Hello Galaxy" to the messageReceiver actor.

class MessageSender(messageReceiver: ActorRef)
  extends Actor {
  
  override def preStart(): Unit = {
    val messages = List(
      "Hello World",
      "Hello Universe",
      "Hello Galaxy"
    )

    for(msg <- messages) {
      messageReceiver ! msg
    }
  }
  ....
}

The preStart method is a lifecycle method provided by Akka Actor.

The below is what's inside the main method, which initializes the receiver and the sender.

val system = ActorSystem("exampleSystem")

val receiver = system.actorOf(
  Props[MessageReceiver],
  "receiver"
)

// sender
system.actorOf(
  MessageSender.props(receiver), 
  "sender"
)

As in the preStart method of MessageSender, the first message to be sent is,

messageReceiver ! "Hello World",

message

where the ! method is a method of LocalActorRef.

override def !(message: Any)
  (implicit sender: ActorRef = Actor.noSender): Unit =
    actorCell.sendMessage(message, sender)

Above actorCell is an instance of ActorCell, which implements Cell trait, and the Cell trait has the following sendMessage.

final def sendMessage(
  message: Any, 
  sender: ActorRef
): Unit =
  sendMessage(Envelope(message, sender, system))

Here you see an Envelope which encapsulates message and sender

envelope

case class Envelope(
  val message: Any, 
  val sender: ActorRef
)

To illustrate the workflow so far up to the sendMessage:

sendMessage

The sendMessage method of the Cell trait (ActorCell) calls the sendMessage method of the Dispatch trait.

def sendMessage(msg: Envelope): Unit =
  try {
    ...
    dispatcher.dispatch(this, msg)
  } 

(The concept of Dispatcher might be unfamiliar to you and it is probably difficult to understand. I will write another article later to illustrate Dispatcher in more detail, but for now, you can assume dispatcher is, as the meaning of the word says, "sender" of messages.)

There are two distinct Dispatch and Dispatcher traits in akka. The Dispatch trait has a reference to dispatch: Dispatcher and Dispatcher's dispatch method is as follows:

def dispatch(
  receiver: ActorCell,
  invocation: Envelope
): Unit = {
  val mbox = receiver.mailbox
  mbox.enqueue(receiver.self, invocation)
  registerForExecution(mbox, true, false)
}

dispatcher

Mailbox has the following enqueue method

def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)

enqueue

The messageQueue is type of MessageQueue class which is defined for each different Mailbox type. The one for the default UnboundedMailbox is:

object UnboundedMailbox {
  class MessageQueue 
    extends ConcurrentLinkedQueue[Envelope] 
    with UnboundedQueueBasedMessageQueue {
      final def queue: Queue[Envelope] = this
  }
}

Coming back to the dispatch method of Dispatcher, it has registerForExecution as follows:

def registerForExecution(mbox: Mailbox, ...): Boolean = {
  ...
  executorService execute mbox
  ...
}

Since Mailbox is defined as ForkJoinTask, the execution (i.e. processing) of Mailbox is run on a different Thread, which will be covered in the next article

abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] 
  with SystemMessageQueue 
  with Runnable {
    ...
}

In the next article, Local Actor workflow part 2 - Receiver side, I will discuss about what happens on the receiver side which is triggered as the above ForkJoinTask.

Instruction to run the example

> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd local-minimal
> sbt
> runMain example.Main

Output

Some println calls are inserted in the complete example at GitHub to illustrate the behavior.

Thread names are shown as [exampleSystem-akka.actor.default-dispatcher-3] and [...-4].

[info] Running example.Main
provider = local
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello World to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Universe to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello World
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Galaxy to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Universe
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Galaxy
[success] Total time: 7 s, completed Jan 30, 2018 6:16:46 AM

References

  • Official documentation of Akka Actor at https://doc.akka.io/docs/akka/2.5/actors.html
  • Official documentation of Akka Dispatcher at https://doc.akka.io/docs/akka/2.5/dispatchers.html
  • Official documentation of Akka lifecycle at https://doc.akka.io/docs/akka/current/actors.html$actor-lifecycle
  • Official documentation of Akka Mailbox at https://doc.akka.io/docs/akka/2.5/mailboxes.html?language=scala#mailboxes)
  • Official documentation of Akka location transparency at https://doc.akka.io/docs/akka/current/general/remoting.html#location-transparency
  • Oracle's documentation about Fork/Join at https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
  • ExecutorService Javadoc at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html