Akka, Concurrency, etc.

Mailbox and ForkJoinTask

Update to the article and the video

Thanks to Victor who immediately noticed I had wrong assumption about ForkJoinTask behavior in akka, now this article and videos were corrected.

Overview

The previous Dispatcher behavior article explained how Dispatcher and threads are related to each other. In this article, we will go one step further in this regard.

The code example is at GitHub, which is the same example as what's discussed in the local actor article(s).

Thread-processing details in Akka

Following the instruction at the bottom of this article, you will get output as follows (also in GoogleSpreadsheet) .

SO MANY things in the table!! but no worries! We will go through each important piece, one after another.

whole-threads

Caveats

You might notice that I am skipping some parts (some rows in the above table) in the article, but that is just to avoid confusion. Even with this simple example, Akka's internal processing is very complicated. So I am only covering pieces to help you understand important stuff.

Thread[2]- sender side

Firstly, let's look at "Thread[2]" from the output table. As far as what's explained this article, Thread[2] is pretty the sender side.

The sender Actor's Mailbox was run(),

thread2-a

triggerring MessageSender's preStart() method:

class MessageSender(messageReceiver: ActorRef) ... {
  override def preStart(): Unit = {
    val messages = List(
      "Hello World",
      "Hello Universe",
      "Hello Galaxy"
    )
    for(msg <- messages) {
      println(s"[${Thread.currentThread().getName}]|sending message $msg to $messageReceiver")
      messageReceiver ! msg
    }
  }
  ...
}

The very first message, "Hello World" was dispatch-ed (sent) as follows:

thread2-b

and as in the previous article the dispatch method is implemented as below, which puts the message to the message queue of the mailbox, and ...

def dispatch(
  receiver: ActorCell,
  invocation: Envelope
): Unit = {
  val mbox = receiver.mailbox
  mbox.enqueue(receiver.self, invocation)
  registerForExecution(mbox, true, false)
}

... then registerForExecution scheduled mbox (= an instance of Mailbox which extends ForkJoinTask) to be executed on a different thread.

abstract class Mailbox(val messageQueue: MessageQueue)

  extends ForkJoinTask[Unit]
  with SystemMessageQueue 
  with Runnable {
    ...
}

Same as the first message, the second ard thrid messages, "Hellow Universe" and "Hello Galaxy" were dispatch-ed as well.

thread2-c

Since the registerForExecution(mbox, ...) is called for the same Mailbox instance, mbox, the underlying executorService scheduled the same Mailbox (ForkJoinTask).

def registerForExecution(mbox: Mailbox, ...): Boolean = {
  ...
  executorService execute mbox
  ...
}

About the general behavior when you execute the same ForkJoinTask instance in ForkJoinPool, see my below tweet (not every single ForkJoinTask is really run):

Thread[4]- receiver side

Let's move onto the "Thread[4]", the receiver side behavior. The point here (and for the whole article) is that processMailbox() is a recursive method.

The scheduled ForkJoinTask triggered the run method. Remenber Mailbox extends ForkJoinTask, so Mailbox overrides the run method.

thread4-a

It's also discussed in the previous article, but processMailbox method executs the receive method of the Actor

thread4-b

class MessageReceiver extends Actor {
  def receive = {
    case s: String =>
      println(s"${Thread.currentThread()} [${self.path}]|EchoActor: received message = $s")
  }
}

Next, you see processMailbox() was called multiple times before you see Mailbox run() finished. thread4-c

because processMailbox method is actually recursive:

@tailrec private final def processMailbox(
  ...
  // def dequeue(): Envelope = messageQueue.dequeue()
  val next = dequeue() 
  ...
  actor invoke next
  ...
  processMailbox(...)
}

so it processed all the three messages, "Hello World", "Hello Universe" and "Hello Galaxy" in the single call of ForkJoinTask's run.

thread4-d

How many messages can be processed by a single ForkJoinTask is controlled by the throughput setting in config.

# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5

Instruction to run the example, and output

We can use the same example code as the "Local Actor workflow part 1 - Sender side" article.code, however, because you need a bit of tweaking which I am going to explain below.

Clone the akka repository,

> git clone https://github.com/akka/akka.git
> cd akka

and insert println calls like this) in akka to see the Mailbox and Dispatcher behavior. Then execute publishLocal,

> sbt
> project akka-actor
> publishLocal

now you will see akka-actor_2.12;2.5-SNAPSHOT is built and stored under your .ivy directory.

[info] :: delivering :: com.typesafe.akka#akka-actor_2.12;2.5-SNAPSHOT :: 2.5-SNAPSHOT :: integration :: Thu Feb 22 07:22:33 JST 2018
[info] delivering ivy file to Users/username/akka/akka-actor/target/ivy-2.5-SNAPSHOT.xml
[info]  published akka-actor_2.12 to Users/username/.ivy2/local/com.typesafe.akka/akka-actor_2.12/2.5-SNAPSHOT/poms/akka-actor_2.12.pom
[info]  published akka-actor_2.12 to Users/username/.ivy2/local/com.typesafe.akka/akka-actor_2.12/2.5-SNAPSHOT/jars/akka-actor_2.12.jar
[info]  published akka-actor_2.12 to Users/username/.ivy2/local/com.typesafe.akka/akka-actor_2.12/2.5-SNAPSHOT/srcs/akka-actor_2.12-sources.jar
[info]  published akka-actor_2.12 to Users/username/.ivy2/local/com.typesafe.akka/akka-actor_2.12/2.5-SNAPSHOT/docs/akka-actor_2.12-javadoc.jar
[info]  published ivy to Users/username/.ivy2/local/com.typesafe.akka/akka-actor_2.12/2.5-SNAPSHOT/ivys/ivy.xml

From here you move to the local actor example code.

> cd ~
// or `cd` to whatever directory you like

> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd local-minimal

Make this change to the local example code, to use the 2.5-SNAPSHOT version of akka-actor jar built by the above step.

//build.sbt
 libraryDependencies ++= Seq(
-  "com.typesafe.akka" %% "akka-actor" % "2.5.9",
+  "com.typesafe.akka" %% "akka-actor" % "2.5-SNAPSHOT",
   scalaTest % Test
 )

From inside the local-minimal directory, you can do:

> sbt
> runMain example.Main

and you will output like the following (order of messages could be little differnt due to concurrency).

After I did some clean-up, I posted the result here in Google Spreadsheet. (Shortened the thread name, exclude user guardian from logs, shortened the actor path, etc)

Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() next=null
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|sending message Hello World to Actor[akka://exampleSystem/user/receiver#1486562265]
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello World,Actor[akka://exampleSystem/user/sender#-1400752577])) started
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello World,Actor[akka://exampleSystem/user/sender#-1400752577])) finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() called, shouldProcessMessage=true
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|sending message Hello Universe to Actor[akka://exampleSystem/user/receiver#1486562265]
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() next=Envelope(Hello World,Actor[akka://exampleSystem/user/sender#-1400752577])
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello Universe,Actor[akka://exampleSystem/user/sender#-1400752577])) started
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|EchoActor: received message = Hello World
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello Universe,Actor[akka://exampleSystem/user/sender#-1400752577])) finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() called, shouldProcessMessage=true
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|sending message Hello Galaxy to Actor[akka://exampleSystem/user/receiver#1486562265]
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() next=Envelope(Hello Universe,Actor[akka://exampleSystem/user/sender#-1400752577])
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello Galaxy,Actor[akka://exampleSystem/user/sender#-1400752577])) started
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|EchoActor: received message = Hello Universe
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Dispatcher dispatch(Envelope(Hello Galaxy,Actor[akka://exampleSystem/user/sender#-1400752577])) finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() called, shouldProcessMessage=true
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox processMailbox() called, shouldProcessMessage=true
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() next=Envelope(Hello Galaxy,Actor[akka://exampleSystem/user/sender#-1400752577])
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox processMailbox() next=null
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|EchoActor: received message = Hello Galaxy
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() called, shouldProcessMessage=true
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() next=null
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() called, shouldProcessMessage=false
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox processMailbox() called, shouldProcessMessage=false
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user/sender]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() called, shouldProcessMessage=false
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox processMailbox() finished
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox run() called
Thread[exampleSystem-akka.actor.default-dispatcher-4,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-2,5,run-main-group-8]|[akka://exampleSystem/user/receiver]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox run() finished
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() called, shouldProcessMessage=false
Thread[exampleSystem-akka.actor.default-dispatcher-3,5,run-main-group-8]|[akka://exampleSystem/user]|Mailbox processMailbox() finished

References

  • Official documentation of Akka Mailbox at https://doc.akka.io/docs/akka/current/mailboxes.html
  • Official documentation of Akka Dispatcher at https://doc.akka.io/docs/akka/2.5/dispatchers.html Oracle's official fork-join tutorial - https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html Oracle's official fork-join article - http://www.oracle.com/technetwork/articles/java/fork-join-422606.html ForkJoinTask javadoc - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html ForkJoinPool javadoc - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html