‘ |
From Akka in Action, Second Edition by Francisco Lopez-Sancho Abraham This article jumps into the basics of how “asking” in Akka works. |
Take 35% off Akka in Action, Second Edition by entering fccabraham2 into the discount code box at checkout at manning.com.
Asking. Or expecting a reply.
Asking, in Akka lingua, is sending a message and providing the means to handle the response. Two things can happen after sending a message. Either you get back a response indicating the message has been processed, or you don’t. If you don’t, then you need to decide what do you do about it—the same as sending a postal letter, like in the good old times. If you need a response and you don’t get it, how long are you willing to wait before you take action?
The answer is formulated in terms of time and two possible outcomes. In Akka, when we ASK for something, we’ll also have to provide how long we’re willing to wait and what we’re going to do, whether or not we receive an answer. Both things are straightforward to set up. For the timing, we use the class Timeout, and for the fact, we can get or not get an answer. This is pattern matching over both options, Success
and Failure
. These are the only two instances of the abstract class Try[T]
where T
is the type of object we expect to get if the answer gets back in time, and therefore, consider successful. We’ll see a sample right away when we look at our new implementation of the Manager, but before that, we can have a look at Figure 1 to get our heads around it.
Figure 1 Possible outcomes of the Manager asking the Worker
Here, we depict a manager asking a worker for a task inside some time boundaries before it stops waiting for a response. Depending on whether the answer comes before or after this timeout, the manager processes a Success or a Failure.
The last idea before we dive into the implementation is to bear in mind that we’re going to see two cases of this idea of asking: a simpler one, where the message sent as the question doesn’t contain anything inside, and a more complete one, where the object we sent contains some information the worker uses.
In both cases, we’ll have the same App and a guardian that passes the list of tasks the Manager gets. In Listing 1, we can see the initial input at the start of the program.
Listing 1 ManagerWorkerApp and Guardian
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import scala.concurrent.duration.SECONDS import akka.util.Timeout import scala.util.{ Failure, Success } object ManangerWorkerApp extends App { val system: ActorSystem[Guardian.Command] = ActorSystem(Guardian(), "example-ask-without-content") system ! Guardian.Start(List("task-a", "task-b", "task-c", "task-d")) #A } object Guardian { sealed trait Command case class Start(tasks: List[String]) extends Command def apply(): Behavior[Command] = Behaviors.setup { context => val manager: ActorRef[Manager.Command] = context.spawn(Manager(), "manager-1") Behaviors.receiveMessage { case Start(tasks) => manager ! Manager.Delegate(tasks) Behaviors.same } } }
#A input, list of tasks
#B passing down the input
The simpler question
If we run the App
in the previous Listing 1, we can see two types of results. The first type is here, in Listing 2.
Listing 2 Output when response gets back in time
[Worker$] - My name is 'worker-task-c'. And I've done my task [Manager$] - task-c has been finished by worker-task-c
Not much can be added here. In this case, a Worker
had finished the task in time, and the Manager
registers that fact. This is a Success
.
On the other hand, we can see the other sort of output we can get in Listing 3.
Listing 3 Output when response does NOT gets back in time
[Manager$] - task 'task-a' has failed with [Ask timed out on [Actor[akka://example-ask-without-content/user/manager-1/worker-task-a#-450068248]] after [3000 ms]. Message of type [ask.simple.Worker$Do]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. [Worker$] - My name is 'worker-task-a'. And I've done my task [DeadLetterActorRef] - Message [ask.simple.Worker$Done$] to Actor[akka://example-ask-without-content/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://example-ask-without-content/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
In this last case, three related things had happened. The manager stopped waiting for the answer and felt back, reporting that task hadn’t been completed. On the other side, the worker took longer than it required, and despite the fact that the Manager gave up waiting, the Worker completed the task nevertheless. This is why we see the second message. The third thing that happened involved two new actors. When the Worker finished its task, it tries to send back an answer, but this message never reaches the Manager. How come? This is because, when ask
ing, an ephemeral intermediating actor gets created for handling the waiting along the predefined Timeout
. Eventually this produces a Success
or a Failure
message in the Manager.
This flow may be a little convoluted at first glance; let’s get some aid from Figure 2. to go over the big picture.
Figure 2. Ephemeral intermediate Actor life span and duties after timeout
From the beginning. When the Manager asks, an ephemeral actor gets in charge of sending the original message to the Worker and sending back Success
or Failure
to the Manager. Whether it’s one or the other, this actor immediately gets into Behavior.stopped
. Once this actor is stopped, its reference stops being valid, and when our Worker
sends back the message, it’s already too late—the ActorSystem takes over, making sure that this message gets delivered to deadLetters.
Now when things go south, we’ve at least some information, but this has to be taken with a pinch of salt; sending messages guarantees best effort.
If, for any reason, something goes wrong in the Worker, and it throws an Exception, or the rack where it runs gets on fire, the Manager gets a Timeout
. The message from the non-existent Worker isn’t created nor sent to deadLetters, as any Exception in an Actor by default is dealt by its creator through supervision.
In Listing 4, we see the Manager we’ve been talking about. It defines the Timeout
and also the countermeasures to Success
and Failure
.
Listing 4 Manager that asks
object Manager { sealed trait Command final case class Delegate(tasks: List[String]) extends Command final case class Report(description: String) extends Command def apply(): Behavior[Command] = Behaviors.setup { context => implicit val timeout: Timeout = Timeout(3, SECONDS) #A Behaviors.receiveMessage { message => message match { case Delegate(tasks) => #B tasks.map { task => val worker: ActorRef[Worker.Command] = context.spawn(Worker(), s"worker-$task") context.ask(worker, Worker.Do) { #C case Success(Worker.Done) => #D Report(s"$task has been finished by ${worker}") case Failure(ex) => #D Report(s"task '$task' has failed with [${ex.getMessage()}") } } Behaviors.same case Report(description) => context.log.info(description) Behaviors.same } } } }
#A timeout implicitly passed to ask
#B input of tasks
#C ask the worker
#D handling possible outcome
How we use the ask
here is quite straightforward. The manager is delegating the task Worker.Do
to whom we’re asking, the worker
, waiting no more than the timeout
.
We can see in Listing 1, in the Worker itself, how Worker.Do(replyTo: ActorRef[Worker.Response])
has a reference to whom it needs to replay to and on what terms. Let’s see the worker definition in Listing 5.
Listing 5 Worker that answers
object Worker { sealed trait Command case class Do(replyTo: ActorRef[Worker.Response]) extends Command sealed trait Response case object Done extends Response def apply(): Behavior[Command] = Behaviors.receive { (context, message) => message match { case Do(replyTo) => doing(scala.util.Random.between(2000, 4000)) #A context.log.info( s"My name is '${context.self.path.name}'. And I've done my task") replyTo ! Worker.Done #B Behaviors.stopped } } def doing(duration: Int): Unit = { val endTime = System.currentTimeMillis + duration while (endTime > System.currentTimeMillis) {} } }
#A emulating the delegated task
#B replying back to manager
First, let’s draw our attention at Worker.Do(replyTo: ActorRef[Worker.Response])
. You may be asking yourself how this replyTo
gets introduced in the message we’re sending. After all, when we used context.ask(worker, Worker.Do)
, there’s no mention of replyTo
in here. How does it end in there?
To answer that, we’ll have to look a bit deeper in Listing 6, in the signature of the ask
itself.
Listing 6 Ask signature
// Scala API impl override def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
If it seems a bit daunting, it means that we need to go more slowly. It’s far from difficult if we go step-by-step.
When we wrote context.ask(worker, Worker.Do)
, looking at the ask’s signature worker
corresponds to the target
param. No mystery here, but Worker.Do
being the createRequest
might not be clear.
We may have thought we were passing an object there, but the signature createRequest: ActorRef[Res] => Req
, tells us that this is a function, not an object. The question then is, how are we getting a function out of this Worker.Do
object we are passing? If you’re unfamiliar with Scala, this is probably confusing. Objects in Scala can be treated as functions as long they have an apply method, and a case class
as we have here is a special class that provides, among other things, this apply method under the hood. In our case, the case class Worker.Do(replyTo: ActorRef[Worker.Response])
produces, thanks to the compiler, a method like def apply(replyTo: ActorRef[Worker.Response]): Worker.Do = new Worker.Do(replyTo)
.
Because this method gets produced, the compiler now understands that this is what we mean when we put Worker.Do
in the ask
method. The compiler infers that we’re referring to the apply
function of our case class
, because this is the only thing that makes sense in the scope of the signature of createRequest
. This is it—we’re passing Worker.Do.apply()
to that method thanks to the syntactic sugar and the cleverness of the compiler.
Now for the mapResponse
. Depending on if it receives the answer in time, the Success
or Failure
in our code is triggered. We already talked about this, but now let’s have a closer look. We see mapResponse: Try[Res] => T
, which reads like the following: mapResponse
is a function that has to define two functions—one with input Success[Res]
and output T
, and another with input Failure
and output T
. In our case, Res
is Worker.Done
, and T
is Manager.Command
.
Finally, the implicit responseTimeout: Timeout
is the amount of time we wait for a response. The fact that it’s an implicit argument means that if there’s an implicit value in the scope, it uses it without you needing to pass it explicitly. Many scopes are defined in Scala, but won’t cover that right now. Suffice to say, that in the scope of our method, we have an implicit ‘timeout’.
Regarding classTag
, we’ll not pay much attention. It’s there for historical reasons and binary compatibility.
This is all you need to know to use ask
, but there’s something else which is useful to pay some attention to. What happens when the Manager sends that request? What’s happening at runtime? We mentioned before that between an actor that asks and the one that answers, there’s an intermediary actor. This middleman actor is the one our Workers replying to and from the signature of the Worker.Do(replyTo: ActorRef[Worker.Response])
. We know its type is ActorRef[Worker.Response]
. When the Manager
asks the Worker
, an actor of that type gets created and is used as the input to the method with no syntactic sugar, Worker.Do.apply(replyTo: ActorRef[Worker.Response])
. Finally, this Worker.Do(intermediaryActor)
is what the Worker
receives.
Ask with payload
Sometimes we ask
something to an actor, and we need to include some information in that question. In the previous example, the only thing the Worker knew was whom to respond to. Let’s have a look at a case almost equal to the previous one, except for the fact that when ask
, we need to pass the worker a Task
that contains an id
and a description
. See Listing 7 for an example of this.
Listing 7 Task Object as payload to pass when asking
final case class Task(taskId: String, taskDescription: String) sealed trait Command final case class Do(task: Task, replyTo: ActorRef[Worker.Response]) extends Command
In this case, when we provide the createRequest
parameter to ask
, we won’t be able to rely on Worker.Do.apply()
as we did before. Remember that createRequest
was expecting a function with one parameter, but now, our Worker.Do.apply
has two parameters, task
and replyTo
. The framework provides the intermediary for us at runtime, the replyTo
, and we need to provide the task
. Now we need a method that can accept a Task
as input, and we can pass it through to the Worker. This gives us back a function that can pass to the ask
. In Listing 8, we have a function with such a signature.
Listing 8 Auxiliary method to create a function of the signature that ask expects
def auxCreateRequest (task: Worker.Task)(replyTo: ActorRef[Worker.Response]): Worker.Do = Worker.Do(task, replyTo)
Let’s have a look at the Manager in Listing 9 to see how this plays out.
Listing 9 Manager that uses the auxCreateRequest
object Manager { sealed trait Command final case class Delegate(tasks: List[String]) extends Command final case class Report(outline: String) extends Command def apply(): Behavior[Command] = Behaviors.setup { context => implicit val timeout: Timeout = Timeout(3, SECONDS) def auxCreateRequest(task: Worker.Task)( #A replyTo: ActorRef[Worker.Response]): Worker.Do = #A Worker.Do(task, replyTo) #A Behaviors.receiveMessage { message => message match { case Delegate(tasks) => tasks.map { task => val worker: ActorRef[Worker.Command] = context.spawn(Worker(), s"worker-$task") context.ask( worker, auxCreateRequest( #B Worker.Task(System.currentTimeMillis().toString(), task))) { #B case Success(Worker.Done(taskId)) => Report(s"$taskId has been finished by ${worker}") case Failure(ex) => Report(s"task has failed with [${ex.getMessage()}") } } Behaviors.same case Report(outline) => context.log.info(outline) Behaviors.same } } } }
#A aux function to pass a task to the worker
#B usage of task function
If you’re not used to functional programming, it might be difficult to understand straight away how this auxCreateRequest
can produce the signature that ask.createRequest
requires.
In our case, the Manager takes a Task
from the guardian, let’s name it taskX
, and passes it to our auxiliary method. This produces a new function we see in Listing 10.
Listing 10 Output after passing ‘taskX’ to auxCreateRequest
def irrelevantName(rsp:ActorRef[Worker.Response]):Worker.Do = Worker.Do(taskX,rsp)
This is called currying, and this functionality is provided in most functional languages. We can think of it as a partial application of the function, and it works like the following. When having a function like multiplication(x: Int,y: Int) = x * y
, you can curry that function by multipication(4)
, which means that you’re passing only x
, not y
. In doing this, you get back a new function multiplication Curried(z: Int) = 4 * z
—and now you can use it as multiplicationCurried(3) = 12
.
In Scala, we need to explicitly state when a function can be curried. This is done by separating those input variables we want to pass individually by their own parentheses. We would do that as def multipication(x: Int)(y: Int)
in this case, and we use it as multiplication(4)_.
Coming back to transforming auxCreateRequest
in Listing 11, we see how we can use the function createRequest
, passing only the task taskX
and the output function we’ll get.
Listing 11 Passing only one param to auxCreateRequest
auxCreateRequest(taskX)(replyTo: ActorRef[Worker.Response]) #A def irrelevantName(rsp:ActorRef[Worker.Response]): Worker.Do = auxCreateRequest(‘taskX`)(rsp:ActorRef[Worker.Response]) #B #A currying with taskX #B output
Now, irrelevantName
function is what we can pass into createRequest
and that taskX
gets included in the message that the worker receives. It’s a bit of a twist if one isn’t used to currying, but you may well need this often. Getting to know the signature of ask is a piece of useful knowledge.
That’s all for this article. If you want to learn more about the book, check it out on Manning’s liveBook platform here.