Akka, Concurrency, etc.

Akka actor's ask pattern and its internal PromiseActorRef

I'm writing this article because I wanted to share my little findings about Akka ask pattern's internal behavior, which are the use of PromiseActorRef.

While doing some experiments in Akka, I came up with a question about how the Akka ask pattern distinguishes multiple messages returned back in a random order from the target actor, and it turned out the answer was PromiseActorRef.

Review of the Akka ask pattern, which uses the ? method.

The Akka ask pattern is described here in the official documentation, but we also review it here. You can skip this section if you are already familiar with it.

In Akka, there are basically two patterns to send a message betwen actors. The first is the tell pattern, which uses the ! method.

actor ! message

This is the fire-and-forget style for sending a message, and used more prevalently than the other pattern in Akka-based applications.

The other pattern is called the ask pattern, which uses the ? method to send a message and waits until there is a returning message back from the target actor.

import akka.pattern.ask
val fut: Future[Any] = actor ? message

The return type of the ? method is Future so that the caller thread of the ? method is not blocked, and upon target actor returning the resulting message to the sender, the Future is completed.

Supplying necessary imports and declarations, the code looks like this:

import akka.actor.ActorRef
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.duration._

val actor: ActorRef = ...
val message = ...

implicit ec: ExecutionContext = ...
implicit timeout: Timeout = 10.seconds
val fut: Future[Any] = actor ? message

and if you do this inside an Actor, it is done as follows:

import akka.actor.{Actor, ActorRef}
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.duration._

class SenderActor extends Actor {
  val actor: ActorRef = ...

  def receive = {
    ...
    val message = ...
  
    implicit ec:ExecutionContext = context.dispatcher
    implicit timeout: Timeout = 10.seconds
    val fut: Future[Any] = actor ? message
    ...
  }
}

Under the hood, this ? method invocation is implemented by Akka's message passing. So, what's the big deal about this and why its internal (PromiseActorRef) is interesting? To see that, let me introduce my question from the next section.

Invoking two Futures by the ? (ask) method

Instead of just invonking the ? method once, let's invoke the ? method for two different messages consecutively.

import akka.actor.{Actor, ActorRef}
import akka.pattern.ask
import scala.concurrent.Future

class MyActor extends Actor {
  val actor: ActorRef = ...

  def receive = {
    ...
    val message = ...
  
    implicit ec: ExecutionContext = context.dispatcher
    // What if the actor replies earlier for someMessage2 than someMessage1?
    val fut1: Future[Any] = actor ? message1
    val fut2: Future[Any] = actor ? message2
    ...
  }
}

The target Actor can be like this, where it sends back the result to the original sender actor in the end.

class TargetActor extends Actor {
  import TargetActor._
  def someBackendOperation(message: Message): Future[String] =
    ... // It might send messages to other actors,
        // or perform async database operations, etc

  def receive = {
    case message: Message =>
      val sender = sender() // sender is same for the two messages
      val resultFuture = someBackendOperation(message)
      resultFuture.onComplete {
        case Success(result) => sender ! result
        case Failure(result) => ... // error handling
      }
  }
}

object TargetActor {
  case class Message(body: String)
}

The point is that, since the someBackendOperation above, which probably sends a message to another actor, or perform some async database operations, etc, is asynchronous and take arbitrary amount of time, the returning message from TargetActor to SenderActor can be sent in a random order.

So, the fut2 can complete earlier than the fut1 below:

// invoked earlier, but completed latter
val fut1: Future[Any] = actor ? message1

// invoked later, but completed earlier
val fut2: Future[Any] = actor ? message2

then how the ? method distinguishes the return for the message2 from the message1? If they are not distinguished, the return for the message2 completes the fut1 which is a total mess, but that does not happen. The ? method distinguishes return for each ? invocation.

PromiseActorRef comes into play

The secret for the above behavior is PromiseActorRef, which is the key to distinguish returns for each ? method invocation.

Although we gave the class SenderActor name to the actor which invoked the ? method, precisely the real sender of the message is different.

A PromiseActorRef instance is the real sender of the message when you invoke the ? method in Akka, and the PromiseActorRef instance is swapned off every time you call the ? method, so that the real sender of the message is different for each ? method invocation. (i.e.) for each ? method, the target actor returns to a different PromiseActorRef instance.

Indeed, the sender: ActorRef parameter in the ? method signature is nothing to do with the real sender (code here):.

def ?(
  message: Any
)(
  implicit timeout: Timeout, sender: ActorRef = Actor.noSender
): Future[Any] =
  internalAsk(message, timeout, sender)

The implicit sender parameter is only used for error logging as follows, not used for anything else within the ? method (code here): .

onTimeout(
  s"""Ask timed out on [$targetName]
      | after [${timeout.duration.toMillis} ms].
      | Sender[$sender] sent message of type "${a.messageClassName}".
      | """.stripMargin))

NOTE: And this also answers another interesting question about why you can invoke the ? method outside Actor. As long as you supply necessary implicit ExecutionContext and implicit Timeout, you can do this, outside Actor.

import akka.actor.ActorRef
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.duration._

val actor: ActorRef = ...
val message = ...

implicit ec: ExecutionContext = ...
implicit timeout: Timeout = 10.seconds
val fut: Future[Any] = actor ? message

The target actor needs the sender to send the result back, so unlike the ! (tell) pattern, there must be a real sender actor instance. PromiseActorRef is used as the real sender, so it doesn't matter whether you call the ? method inside or outside Actor.


Lastly, the below is the internal implementation of the ask pattern, which is called inside the ? method (code here):

/**
  * INTERNAL API: for binary compatibility
  */
private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match {
  case ref: InternalActorRef if ref.isTerminated ⇒
    actorRef ! message
    Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
  case ref: InternalActorRefif (timeout.duration.length <= 0)
      Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
    else {
      val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
      actorRef.tell(message, a)
      a.result.future
    }
  case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
}

And as in the comment of PromiseActorRef, it is optimized for this temporary use in the ask pattern. (code here)

 /**
 * Akka private optimized representation of the temporary actor spawned to
 * receive the reply to an "ask" operation.
 *
 * INTERNAL API
 */
private[akka] final class PromiseActorRef private (
  val provider: ActorRefProvider,
  val result: Promise[Any], _mcn: String
)

Hope this was interesting for you as well, and made it clear that you don't need to be afraid of the order of returning messages from the target to the sender, when using the Akka ask pattern.