Akka, Concurrency, etc.

Local Actor workflow part 2 - Receiver side

Overview

You can find the code and instruction to run the example at GitHub.

This is continued from the previous article, Local Actor workflow part 1 - Sender side. I would recommend you to read that article too.

Also, later I am going to write the remote versions of articles to illustrate the message-sending/receiving behavior of Akka Actor when sending across different JVMs.

Workflow

As in bottom of the previous Local Actor workflow part 1 - Sender side article, the below registerForExecution method will let Java's ExecutorService process Mailbox , which is defined as ForkJoinTask, to be executed on a different thread.

def registerForExecution(mbox: Mailbox, ...): Boolean = {
  ...
  executorService execute mbox
  ...
}
abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] 
  with SystemMessageQueue 
  with Runnable {
    ...
}

When ExecutorService executes the Mailbox as ForkJoinTask, then the following run method of ForkJoinWorkerThread is called:

public void run() {...} 

(Somehow a copy of ForkJoinWorkerThread from Java's standard library is in akka's source code ... not sure why)

The run method above runs the following method of Mailbox

@tailrec private final def processMailbox(
  ...
  // def dequeue(): Envelope = messageQueue.dequeue()
  val next = dequeue() 
  ...
  actor invoke next
  ...
  processMailbox(...)
}

By dequeue-ing an Envelope, Mailbox calls the invoke method of ActorCell,

processmailbox

final def invoke(messageHandle: Envelope): Unit = {
  ...
  receiveMessage(msg)
  ...
}

which unpacks the message from Envelope then calls receiveMessage of ActorCell,

// def actor: Actor = ... in ActorCell
final def receiveMessage(msg: Any): Unit =
  actor.aroundReceive(behaviorStack.head, msg)

receivemessage

Here, Actor has an important method called aroundReceive,

def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
  if (
    receive.applyOrElse(msg, Actor.notHandledFun)
      .asInstanceOf[AnyRef] eq Actor.NotHandled
  ) {
    unhandled(msg)
  }
}

which, as the name suggests, wraps around Actor's receive method.

class MessageReceiver extends Actor {
  def receive = {
    case s: String =>
      EchoActor: received message = $s")
  }
}

receive

In aroundReceive you can see receive.applyOrElse is called, and if there is no match in receive's patter-match, it will call unhandled of Actor.

Up to here, we have pretty much covered the receiver side of the behavior in actor's message passing. Next up, I will go through how this changes when sending to a remote JVM.

Instruction to run the example

> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd local-minimal
> sbt
> runMain example.Main

Output

Some println calls are inserted in the complete example at GitHub to illustrate the behavior.

Thread names are shown as [exampleSystem-akka.actor.default-dispatcher-3] and [...-4].

[info] Running example.Main
provider = local
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello World to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Universe to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello World
[exampleSystem-akka.actor.default-dispatcher-5] sending message Hello Galaxy to Actor[akka://exampleSystem/user/receiver#-846959521]
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Universe
[exampleSystem-akka.actor.default-dispatcher-2] EchoActor: received message = Hello Galaxy
[success] Total time: 7 s, completed Jan 30, 2018 6:16:46 AM

References