Streaming data with Spring and EventSource

In Spring 5 Pivotal added the WebFlux framework, supporting reactive programming. In this blog I like to take a closer look and see how we can build a non-blocking chat application from database to web browser. But first, let’s start with a bit of background.

Reactive programming

According to this blog reactive programming can be described as:

“Reactive Programming is a style of micro-architecture involving intelligent routing and consumption of events, all combining to change behaviour”

What does this say? Let’s focus on the main part:

In reactive programming we intelligently route and consume events, to change the behaviour of these and other events. Therefore, we can look at our data as a flow of events that changes over time. We program to this flow, changing the behaviour in our output events, in turn producing a new flow of events, with new preferred changed outputs for corresponding events.

To get a grasp compare the concept with a spreadsheet that contains cells with calculated values. If any of the cells used in the calculation change, the corresponding cells change also. If we change the value of the cell later in time again, the calculated cell will again show the correctly changed output.

A useful tool in Reactive Programming is the visual representation of data flows in so-called marble diagrams. The example below (credits to @rishantagarwal) shows a graphical representation of a stream of user-generated mouse-clicks, and a flow of operations that lead to an outcome.

As you can see this combination of operations is setup to filter out everything but double (or more) clicks within 250ms, leading to us a new stream of events.

Up until now reactive programming has good momentum in the frontend, where frameworks such as Angular incorporate it into their core setup. Lately there is an increase of support in the backend frameworks as well, worth checking out.

Spring Webflux

For the backend, a certain level of standardisation of reactive programming is desirable. Reactive Streams is regarded as the main contract for implementing reactive programming (adopted in Java 9 as java.util.concurrent.Flow). Spring implements this standard through Reactor, but other implementations can be used within Spring too, such as RxJava. Reactor uses the Reactive type Flux to support operations of flows with 0 to N events and Mono for 0 to 1 events.

These reactive types can easily be used and converted through basically the same operators used in frontend programming. Project Reactor conveniently lists an overview of those operators here.

The case

The creation of a small chat application supported by a MongoDB database. The main idea is that users can open the chat application in a web browser and join the chat. Upon opening the application the chat history is shown, from then on all new chat messages are added at the top of the list.

For this application we need to create two reactive endpoints. One endpoint to retrieve a set of chat messages and another to post new messages. To stimulate activity in the chat we add some start data in the database.

The backend

So, the database. For this demo we use MongoDB (there are also drivers for working reactively with relational databases, such as the Vert.x SQL driver, which support non-blocking calls to create true reactive applications). Let’s use flapdoodle as we want to try some stuff out and this is a nice and easy way to do so.

We need to configure use of webflux, and we will use spring-data to talk to our MongoDB in a reactive way. I use gradle, so that would look like this:

implementation('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
implementation('org.springframework.boot:spring-boot-starter-webflux')
implementation('de.flapdoodle.embed:de.flapdoodle.embed.mongo')

Now let’s create our ChatMessage entity (with Kotlin for the backend application):
class ChatMessage(
    @Id
    var id: String,
    val user: String,
    val text: String
)

As said, we expose two endpoints, one for getting the full list of chat messages, and one to post a new one. First we need to create a repository to get access to the database:
interface ChatMessageRepository : ReactiveCrudRepository<ChatMessage, String>

We extend ReactiveCrudRepository from Spring Data to make use of the reactive paradigm. Now we can use the repository in our controller, implementing the endpoints:
@GetMapping("/chats")
fun allChats(): Flux<ChatMessage> {
    return chatMessageRepository.findWithTailableCursorBy()
}

@PostMapping("/chat")
fun addChatMessage(@RequestBody message: ChatMessage): Mono<ChatMessage> {
    return chatMessageRepository.save(message)
}

As you can see we already return the reactive types, Flux  and Mono . Another thing also stands out, we do not simply use the findAll()  method to get all chats, but use findWithTailableCursorBy() . We need to do this, because we want to maintain an open connection after all results have been fetched, so that new chats also flow through our endpoint.

In MongoDB the latter is done using a tailable cursor. This does put some extra requirements on the type of collection we use. We need to create a capped collection, when starting the application using a CommandLineRunner bean:

val mongo = MongoClient()
val template = MongoTemplate(mongo, "local")
val chatMessageCollectionName = template.getCollectionName(ChatMessage::class.java)
template.createCollection(chatMessageCollectionName, CollectionOptions.empty().capped().size(9999999L))

Here we connect to our mongo database, retrieve our collection and then manually turn this into a capped collection. We literally cap it as well, giving it a fixed size. Documents are inserted in natural order, so when the limit is reached, the oldest documents are removed.

Let’s also insert some setup data during application startup. Like our chats as an ordered list using our ReactiveMongoTemplate . Note that because reactive streams are inert, we need to subscribe to our returned stream.

val chats = Mono.just(
       listOf(
               ChatMessage("1", "Paul", "Hi there!"),
               ChatMessage("2", "Ringo", "Hi yourself!"),
               ...
       )
)
template.insertAll(chats).subscribe()

Ok, that should do it for the backend part of our application, let’s take a look at the frontend.

The frontend

For the frontend we create an easy application that connects to the backend through an EventSource. This keeps the connection open for a flow of events, which is exactly what we want. With rxjs to create an observable datastream at the client side:

const { Observable } = rxjs;
const chatObservable = Observable.create(observer => {
    const evtSource = new EventSource('http://localhost:8080/api/chats');
    evtSource.onmessage = e => {
        const chatMessage = JSON.parse(e.data);
        observer.next(chatMessage);
    }
    evtSource.onerror = e => observer.error(e);
});

Using the Observable.create method we can control when we emit data on our new observable. Every time we receive a message from the EventSource we do an emit using the next() method, after parsing the data.

When we receive an error we simply emit this in the error channel of our observable. We do not complete our observable stream because the data will keep on flowing as long as the application is open.

Using this observable we can then easily subscribe and update our webpage:

chatObservable.subscribe(
    chatMessage => addChatMessage(chatMessage.text, chatMessage.user),
    error => console.log('Gotten error in observable stream ' + error),
    () => console.log('Observable stream got closed.')
);

The addChatMessage method we call here uses JQuery to identify the chat list and prepend a new list item with the user posting the chat and the chat message. When an error is emitted we log this.

We also log when we receive an error on our observable stream:

evtSource.onerror = function() {
    console.log("EventSource failed.");
}

The onError gives an error when an error occurs ór the connection is closed, as would be if we would not use a tailable cursor. So to see what happens, let’s change the controller call from:
chatMessageRepository.findWithTailableCursorBy()

to:
chatMessageRepository.findAll()

When we run again we see two things happening. First we see that onError is called. Further we see a reconnect attempt by the EventSource, resulting in getting the 5 records (over and over) again.

Since there is no such thing as a close event for EventSource, and if we want to have a regular collection flowing its data to the frontend, we implement an ‘end-of-stream’ event ourselves. For example by concatenating an event at the end of the stream in the backend, to indicate that the flow has ended:

Flux.concat(chatMessageRepository.findAll(), 
Mono.just(ChatMessage(id = null, user = "test", text = "THIS_IS_THE_END")))

Off course we would use a different data model in a real situation.

Anyway, the above indication can then be used in the frontend to close the connection. Let’s extend our onmessage function a little bit and close the EventSource when we receive our closing event. We also complete our observable stream:

    evtSource.onmessage = e => {
        const chatMessage = JSON.parse(e.data);
        if (chatMessage.text === 'THIS_IS_THE_END') {
            evtSource.close();
            observer.complete();
            return;
        }
        observer.next(chatMessage);
    }

This prevents our EventSource from trying to reconnect and it closes our client side observable stream, preventing any errors.

So, as I hopefully demonstrated, you see that combining Spring WebFlux and EventSource brings a lot of possibilities to stream data from database to web browser. Advantages of the language are better scalability and (on the backend) a smaller number of threads needed to handle concurrency.

One thing to keep in mind though, when you want to use a reactive setup, you need to have a truly reactive stack from bottom to top. That means using a database and drivers that support this, as well as a Servlet 3.1+ application server.

For this demo we demonstrated this with MongoDB with the reactive streams driver from Spring Data combined with an embedded Netty application server.

Happy coding!

Please find the code for the backend in https://gitlab.com/rweekers/webflux-demo and the client in https://gitlab.com/rweekers/webflux-demo-client.

Leave a Reply

Your email address will not be published. Required fields are marked *