net2020

Akka actors calls inside asynchronous code

8/27/2018

Akka actors calls inside asynchronous code

Introduction

Consider this code fragment:

class MyActor extends Actor with ActorLogging {
  def receive = {
    case MyRequest(input) =>
      val f = Future {
        //computation taking a long time goes here and returns output
        sender ! MyResponse(output)
      }
    case unknown => log.warning("received unknown message: " + unknown)
  }
}

Chances are that the actor that sent MyRequest to MyActor is not going to receive an answer. Instead you're going to see a message saying that actor system failed to deliver MyResponse and possibly something about dead letters queue.

What is going on ?

Actor is a stateful object. Single actor instance can process multiple requests during its lifetime. Meaning that internal state of an actor is going to change from request to request. That includes the 'sender' field. Consider this scenario:

actor A sends request to MyActor, MyActor.sender = actor A ref
actor B sends request to MyActor, MyActor.sender = actor B ref
actor C sends request to MyActor, MyActor.sender = actor C ref
...

If MyActor.receive tries to run code asynchronously that means that the actual execution time for that code is unknown. The code defined in MyActor, inside Future block, might be looking at an unspecified value of the MyActor.sender field (as described above).

How to deal with this problem

Make sure that your code is looking at the right sender value:

class MyActor extends Actor with ActorLogging {
  def receive = {
    case MyRequest(input) =>
      val mySender = sender //capture sender value here and use it later
      val f = Future {
        //computation taking a long time goes here and returns output
        mySender ! MyResponse(output)
      }
    case unknown => log.warning("received unknown message: " + unknown)
  }
}

This is Akka actor pipeTo pattern as implemented in Akka code:

//import akka.pattern.pipe

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef =
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

In this case 'sender' argument captures the original sender actor reference. After future is done (andThen) and future output is sent to 'recipient' actor 'sender' argument is used, instead of the mutable MyActor.sender variable.