Skip to content

Akka:Http:SSE

Example

Akka-http example of SSE using an Actor as the source of the events Stream.

package com.klarna.risk.linking.variables

import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream._
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object SseApp extends App {

  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  implicit val actorSystem = ActorSystem()
  implicit val mat = ActorMaterializer()

  val (sourceQueue, eventsSource) = Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
    .delay(1.seconds, DelayOverflowStrategy.backpressure)
    .map(message => ServerSentEvent(message))
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
    .run()

  val streamingActor = actorSystem.actorOf(Props(classOf[StreamingActor], sourceQueue))

  def route: Route = {

    path("events") {
      get {
        complete {
          eventsSource
        }
      } ~ put {
        entity(as[String]) { event =>
          complete {
            streamingActor ! event
            StatusCodes.OK
          }
        }
      }
    }
  }

  Http().bindAndHandle(route, "0.0.0.0", 9999)

  class StreamingActor(source: SourceQueueWithComplete[String]) extends Actor {
    override def receive: Receive = {
      case msg: String => source.offer(msg)
    }
  }

}

See also

Favorite site