Creating WebSocket subscription server with Akka

Akka documentation contains a basic example of a simple WebSocket server, but it just sends back a message it has received. Building on top of it to have an Actor handling user messages and sending data back to the user is not obvious.

Let’s build a simple WebSocket server with a /clock endpoint. After connecting, a client can subscribe to receive time in the requested timezones. It is not a very useful application, but the concept is the same if you would like to build a subscription for chat, real-time game, or stock price updates.

We will be using Akka Streams with Akka HTTP. I will assume, at the very least, you read an introduction for each of these.

First, we need an Actor that will hold a state of the individual connection: a list of timezones the client has subscribed to receive. Additionally, it will have a Timer set to send an update to the user every second. We will call it Clock.

Possible commands that we expect are:

import akka.http.scaladsl.model.ws.Message

object Clock {

  sealed trait Command
  object Command {

    case class UserRequest(msg: Message) extends Command

    case class Connection(actorRef: ActorRef[Clock.Outgoing]) extends Command

    case class ConnectionFailure(ex: Throwable) extends Command

    case object Complete extends Command

    case object Tick extends Command

  }
}
  • UserRequest - Raw message from client is represented using akka.http.scaladsl.model.ws.Message in Akka HTTP. We will wrap it in this case class before passing it to Clock Actor.

  • Connection - We will have 2 Actors that will be sending messages to each other. The first is Clock Actor, 2nd Actor is on the other end; it is the user connection. We will provide ActorRef of that connection Actor in this Command.

  • ConnectionFailure - If there is some problem in the Flow and it emits an exception, we will receive it wrapped in this Command.

  • Complete - We will receive this Command when the client closes the connection.

  • Tick - this a Command sent by our timer every second. We will send a message to the user on each Tick.

Connection Actor will handle the following outgoing messages:

import akka.http.scaladsl.model.ws.Message

object Clock {

  sealed trait Command
  /* ... */

  sealed trait Outgoing
  object Outgoing {

    case class MessageToClient(msg: Message) extends Outgoing

    case object Completed extends Outgoing

    case class Failure(ex: Exception) extends Outgoing

  }
}
  • MessageToClient is a message that we send to the client.
  • Completed We can send this message to terminate the stream
  • Failure this message will close the stream with an exception.

Technically speaking, we will not define the Behavior of the connection Actor in a typical way. Instead, we will create it using ActorSource.actorRef. That is why I keep the Outgoing type inside the Clock Actor.

We have incoming and outgoing message types defined. Let’s define behaviour.

We will start our Clock in a pending state, awaiting for Connection message. In this state, we may receive Complete and ConnectionFailure messages too, for which we will stop the Actor. For any other message, we do nothing. When we receive the Connection command, we will start a timer sending a Tick every second and update Actor’s behaviour to the connected state.

import java.time.{ ZoneId, ZonedDateTime }
import java.util.concurrent.TimeUnit

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior }
import akka.http.scaladsl.model.ws.{ Message, TextMessage }

import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success, Try }

object Clock {

  sealed trait Command
  /* ... */

  sealed trait Outgoing
  /* ... */

  def pending(): Behavior[Command] = Behaviors.receive { (ctx, msg) =>
    msg match {
      case Command.Connection(ref) =>
        Behaviors.withTimers { timers =>
          timers.startTimerAtFixedRate(Command.Tick, Duration.apply(1, TimeUnit.SECONDS))
          connected(ref, Seq.empty)
        }
      case Command.ConnectionFailure(ex) =>
        ctx.log.warn("WebSocket failed", ex)
        Behaviors.stopped
      case Command.Complete =>
        ctx.log.info("User closed connection")
        Behaviors.stopped
      case _ => Behaviors.same
    }
  }
  def connected(actorRef: ActorRef[Clock.Outgoing], timeZones: Seq[ZoneId]): Behavior[Command] =
    Behaviors.receive { (ctx, msg) =>
      msg match {
        case Command.Connection(_) => // shouldn't happen at this point
          Behaviors.same
        case Command.UserRequest(TextMessage.Strict(txt)) =>
          Try {
            ZoneId.of(txt)
          } match {
            case Success(tz) =>
              actorRef ! Outgoing.MessageToClient(TextMessage(s"Subscribed to $tz"))
              connected(actorRef, timeZones :+ tz)
            case Failure(ex) =>
              actorRef ! Outgoing.MessageToClient(TextMessage(ex.getMessage))
              Behaviors.same
          }

        case Command.UserRequest(uk) =>
          actorRef ! Outgoing.MessageToClient(TextMessage(s"Received unknown: $uk"))
          Behaviors.same
        case Command.Tick =>
          actorRef ! Outgoing.MessageToClient(
            TextMessage(timeZones.map(tz => tz.toString -> ZonedDateTime.now(tz)).toString())
          )
          Behaviors.same
        case Command.ConnectionFailure(ex) =>
          ctx.log.warn("WebSocket failed", ex)
          Behaviors.stopped
        case Command.Complete =>
          ctx.log.info("User closed connection")
          Behaviors.stopped
      }
    }
}

In this state, we need to handle all Commands. I think most of the code is self-explanatory. Notice the actorRef ! Outgoing.MessageToClient calls. This is how we send a message back to the client.

With Clock Actor defined, let’s build the Flow. Akka HTTP WebSocket handler requires Flow[Message, Message, Any].
That means we need a Sink[Message, NotUsed] and Source[Message, Unit]

Here is the incoming sink, taking Message from a client, converting it to Clock.Command.UserRequest and forwarding it to Clock actor.

import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.typed.scaladsl.{ ActorSink, ActorSource }

object ClockWebSocketFlow {
  def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
    val incoming: Sink[Message, NotUsed] = Flow[Message]
      .map(Clock.Command.UserRequest)
      .to(
        ActorSink.actorRef[Clock.Command](
          clockActor,
          onCompleteMessage = Clock.Command.Complete,
          onFailureMessage = { case ex => Clock.Command.ConnectionFailure(ex) }
        )
      )
  }
}

ActorSink.actorRef requires:
ref - ActorRef of the Actor to be used.
onCompleteMessage - a message sent to the Actor when the stream completes.
onFailureMessage - a function from a Throwable to a type that the target Actor can accept. Sent when the stram throws an Exception.

object ClockWebSocketFlow {
  def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
    val incoming: Sink[Message, NotUsed] = ???

    val outgoing: Source[Message, Unit] = ActorSource
      .actorRef[Clock.Outgoing](
        completionMatcher = { case Clock.Outgoing.Completed => },
        failureMatcher = { case Clock.Outgoing.Failure(ex)  => ex },
        bufferSize = 10,
        OverflowStrategy.dropHead
      )
      .mapMaterializedValue(client => clockActor ! Clock.Command.Connection(client))
      .map {
        case Clock.Outgoing.MessageToClient(msg) => msg
        // These are already handled by completionMatcher and failureMatcher so should never happen
        // added them just to silence exhaustiveness warning
        case Clock.Outgoing.Completed | Clock.Outgoing.Failure(_) => TextMessage.Strict("")
      }
  }
}

Next, we define the outgoing Source. That is the Connection Actor to which we will be sending Clock.Outgoing messages. The most important part is the mapMaterializedValue call. This is how we will pass ActorRef of Connection Actor to Clock Actor.

Next step is to build the flow:

object ClockWebSocketFlow {
  def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
    val incoming: Sink[Message, NotUsed] = ???
    val outgoing: Source[Message, Unit] = ???

    Flow.fromSinkAndSource(incoming, outgoing)
  }
}

All that is left is creating the HTTP client to handle our Flow which we will take care of in RootActor. Our real root/top-level actor will handle SpawnProtocol.Command, allowing us to create actors outside of the ActorSystem context. We can’t access actor context within Akka HTTP route definition and use it to spawn actors.

import java.util.concurrent.TimeUnit

import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props, SpawnProtocol }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{ handleWebSocketMessages, path }
import akka.http.scaladsl.server.Route
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }

object RootActor {
  def apply(system: ActorSystem[SpawnProtocol.Command]): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
    implicit val timeout   = Timeout(3, TimeUnit.SECONDS)
    implicit val scheduler = system.scheduler

    val websocketRoute =
      path("clock") {
        val clientF = system.ask[ActorRef[Clock.Command]] { ref =>
          Spawn[Clock.Command](
            Clock.pending(),
            "client",
            props = Props.empty,
            replyTo = ref
          )
        }
        val client = Await.result(clientF, Duration.Inf)
        handleWebSocketMessages(
          ClockWebSocketFlow(client)
        )
      }

    startHttpServer(websocketRoute)(system)

    Behaviors.empty
  }

  private def startHttpServer(routes: Route)(implicit system: ActorSystem[_]): Unit = {
    import system.executionContext

    val futureBinding = Http().newServerAt("localhost", 8080).bind(routes)
    futureBinding.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        system.log.info("Server online at http://{}:{}/", address.getHostString, address.getPort)
      case Failure(ex) =>
        system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
        system.terminate()
    }
  }
}

The important part here is the implementation of path("clock") We use the Ask pattern with SpawnProtocol to create an instance of ClockActor. We have to Await for it and use the result to create a Flow that will handle the WebSocket connection.

The last step is the Main class.

import java.util.concurrent.TimeUnit

import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed._
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.util.Timeout

object Main extends App {
  val system             = ActorSystem[SpawnProtocol.Command](SpawnProtocol(), "WebSocketServer")
  implicit val timeout   = Timeout(3, TimeUnit.SECONDS)
  implicit val scheduler = system.scheduler
  system.ask[ActorRef[Nothing]](ref => Spawn[Nothing](RootActor(system), "root", Props.empty, ref))
}

We create ActorSystem and use SpawnProtocol to create a RootActor.

That is it. You can now run it and use the WebSocket client to test it.

References: