Akka:Http:SSE
Example
Akka-http example of SSE using an Actor as the source of the events Stream.
package com.klarna.risk.linking.variables
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream._
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object SseApp extends App {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
val (sourceQueue, eventsSource) = Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.delay(1.seconds, DelayOverflowStrategy.backpressure)
.map(message => ServerSentEvent(message))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
val streamingActor = actorSystem.actorOf(Props(classOf[StreamingActor], sourceQueue))
def route: Route = {
path("events") {
get {
complete {
eventsSource
}
} ~ put {
entity(as[String]) { event =>
complete {
streamingActor ! event
StatusCodes.OK
}
}
}
}
}
Http().bindAndHandle(route, "0.0.0.0", 9999)
class StreamingActor(source: SourceQueueWithComplete[String]) extends Actor {
override def receive: Receive = {
case msg: String => source.offer(msg)
}
}
}