Akka, Concurrency, etc.

Akka remoting minimal example part 2 - sender side

Overview

You can find the code and instruction to run the example at GitHub.

This is continued from the previous article, and now we are going deep into the implementation and behavior of akka remoting on the sender side.

Workflow

As in the Main of this example, the sender side sends a message "Hello" to the receiver side, but in this example, as it uses remoting, the receiver side is referenced by ActorSelection instead of local ActorRef unlike the local sender example.

val selection: ActorSelection =
  context.actorSelection(path)

selection ! "Hello!!"

ActorSelection has path inside, which is a URL of the target actor. The components of the path URL is shown as follows:

 val path = "akka.tcp://receiverSystem@127.0.0.1:2551/user/receiver"

path

You can find more detail about akka's path in the official documentation, and components of the path.

Now let's look into the ! method of ActorSelection,

trait ScalaActorSelection {
  this: ActorSelectiondef !(msg: Any)
       (implicit sender: ActorRef = Actor.noSender) = 
    tell(msg, sender)
}

and the below tell method called from the above. You can see that the original message "Hello" is wrapped into ActorSelectionMessage.

def tell(
  msg: Any,
  sender: ActorRef
): Unit =
  ActorSelection.deliverSelection(
    ...,
    ActorSelectionMessage(msg, ...)
  )

actorselectionmessage

Through the deliverSelection method, ActorSelection calls the following method of RemoteActorRef.

override def !(message: Any)(...): Unit = {
  ...
  //remote: RemoteTransport
  remote.send(message, Option(sender), this) 
  ...
}

remote is an instance of RemoteTransport which has the following send method

override def send(message: Any, ... ): Unit = 
  ...
  case Some(manager) 
    ⇒ manager.tell(Send(message, ... ), ... )
  ...  
}  

manager is ActorRef pointing to an EndPointManager. (More precisely, there is actually one more actor in-between, but the message is anyway delivered to EndPointManager).

endpointmanager

EndpointManager manager has a buffer inside,

val buffer = new java.util.LinkedList[AnyRef]

and upon flushing the buffer, the sendBufferedMessages method is called to efficiently send buffered messages via network.

def sendBufferedMessages(): Unit = {
  ...
  val ok = writePrioLoop() && writeLoop(SendBufferBatchSize)
  ...
}

The reason for this buffering behavior is, if my understanding is correct, because there is throughput gap between local message-passing (up to EndPointWriter) and the remote message-passing (after EndPointWriter), so this buffering behavior will fill in the gap and keep the overall throughput of whole message-passing high.

There is a following method in EndpointWriter,

//class EndpointWriter in akka.remote.Endpoint.scala
  def writeSend(s: Send): Boolean = try {
    ...
      
      val pdu: ByteString = codec.constructMessage(
        ..., 
        serializeMessage(s.message), 
        ...)

      ...
      val ok = h.write(pdu)
    ...
  }

which performs message serialization, so that the message is converted to a payload which can be passed via network. As akka doc's serialization section says:

However, messages that have to escape the JVM to reach an actor running on a different host have to undergo some form of serialization (i.e. the objects have to be converted to and from byte arrays).

serialize

serialization converts a JVM object into Array[Byte]. The above writeSend converts Array[Byte] further into ByteString by its apply method. ByteString is extensively used in Akka when payload needs to be send via network.

object ByteString {

  /**
   * Creates a new ByteString by copying a byte array.
   */
  def apply(bytes: Array[Byte]): ByteString = CompactByteString(bytes)

Now it comes down to the point between the application (akka) layer and the network layer. The write method of TcpAssociationHandle has Channel class instance where the Channel class is defined in the Netty library.

//Channel is a class in netty, so from here the work is passed to netty
private[remote] class TcpAssociationHandle(
  val localAddress:    Address,
  val remoteAddress:   Address,
  val transport:       NettyTransport,
  private val channel: Channel)
  extends AssociationHandle {
  import transport.executionContext

  override val readHandlerPromise: Promise[HandleEventListener] = Promise()

  override def write(payload: ByteString): Boolean =
    if (channel.isWritable && channel.isOpen) {
      channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer))
      true
    } else false

netty

So this lets netty take care of payload transfer to a remote JVM.

Instruction to run the example, and output

As this example uses Akka remoting to send a message, you need to run two JVMs for the receiver and sender of the application respectively.

Firstly, run the receiver side with the receiver argument supplied to Main.

> git clone https://github.com/richardimaoka/resources.git
> cd resources
> cd remote-minimal
> sbt
> runMain example.Main receiver

You'll get output like below, then it waits until the message is sent from the sender.

> runMain example.Main receiver
[info] Running example.Main receiver
Program args:
receiver
running startMessageReceiver()
[INFO] [02/03/2018 13:36:58.281] [run-main-0] [akka.remote.Remoting] Starting remoting
[INFO] [02/03/2018 13:36:58.462] [run-main-0] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://receiverSystem@127.0.0.1:2551]
[INFO] [02/03/2018 13:36:58.464] [run-main-0] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://receiverSystem@127.0.0.1:2551]
provider = remote
listening at port = 2551
started a receiver actor = Actor[akka://receiverSystem/user/receiver#-603875191]

Then in the same directory, run the same Main with sender as the argument

> sbt
> runMain example.Main sender

this is the sender side output:

[info] Running example.Main sender
Program args:
sender
running startMessageSender()
[INFO] [02/03/2018 13:37:16.215] [run-main-0] [akka.remote.Remoting] Starting remoting
[INFO] [02/03/2018 13:37:16.427] [run-main-0] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://senderSystem@127.0.0.1:2552]
[INFO] [02/03/2018 13:37:16.432] [run-main-0] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://senderSystem@127.0.0.1:2552]
provider = remote
listening at port = 2552
sending a message to akka.tcp://receiverSystem@127.0.0.1:2551/user/receiver
[INFO] [02/03/2018 13:37:19.533] [senderSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://senderSystem@127.0.0.1:2552/system/remoting-terminator] Shutting down remote daemon.
[INFO] [02/03/2018 13:37:19.537] [senderSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://senderSystem@127.0.0.1:2552/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [02/03/2018 13:37:19.577] [senderSystem-akka.actor.default-dispatcher-4] [akka.remote.Remoting] Remoting shut down
[INFO] [02/03/2018 13:37:19.577] [senderSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://senderSystem@127.0.0.1:2552/system/remoting-terminator] Remoting shut down.
[success] Total time: 5 s, completed Feb 3, 2018 1:37:19 PM

then you see the receiver output as follows:

EchoActor: received message = Hello!!

and immediately after that, the receiver side shows this error, which can be ignored.

[ERROR] [02/03/2018 13:37:19.572] [receiverSystem-akka.remote.default-remote-dispatcher-15] [akka.tcp://receiverSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsenderSystem%40127.0.0.1%3A2552-0/endpointWriter] AssociationError [akka.tcp://receiverSystem@127.0.0.1:2551] <- [akka.tcp://senderSystem@127.0.0.1:2552]: Error [Shut down address: akka.tcp://senderSystem@127.0.0.1:2552] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://senderSystem@127.0.0.1:2552
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]

As explained in this thrad in akka-user mailing list, the error happens specifically when you launch a process like this example from sbt, but when you compile your application and run it witout sbt, then the error disappears.

Once everything is done, press the enter key on the receiver side's console and you get this:

[INFO] [02/03/2018 13:38:05.942] [receiverSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://receiverSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [02/03/2018 13:38:05.944] [receiverSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://receiverSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [02/03/2018 13:38:05.960] [receiverSystem-akka.actor.default-dispatcher-3] [akka.remote.Remoting] Remoting shut down
[INFO] [02/03/2018 13:38:05.960] [receiverSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://receiverSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

References