Project Reactor and Reactive Streams

Woman on ComputerReactive programming represents a change in how we think about an application's execution model. In a Reactive application, execution does not follow a linear model where one request is handled by one thread, instead multiple requests are handled in an event driven and non-blocking manner. A Reactive application keeps on handling new requests, instead of waiting for blocking operations. This type of application only reacts to the request when the response is ready. Reactive programming provides an event-based, asynchronous, and streaming programming model that can handle a large volume of concurrent requests coming from a single or multiple clients. Reactive applications usually require a small number of threads to scale vertically, rather than scaling horizontally. Designing and implementing applications using Reactive programming enables the application to maximize its use of the CPU, allowing the application to be more performant and efficient than traditional Java web applications.

Reactive Streams is a specification that enables the creation of libraries that implement the Reactive paradigm. Currently two of the most popular libraries that implement Reactive Streams are Project Reactor by Pivotal and RxJava (v2). The Reactive Streams Specification is composed of 4 simple Java interfaces, the definitions come straight from the Reactive Streams Javadocs:

  • Publisher - A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s)
  • Subscriber - Will receive call to Subscriber.onSubscribe(Subscription) once after passing an instance of Subscriber to Publisher.subscribe(Subscriber).
  • Subscription - Represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher
  • Processor - Represents the processing stage, and is both a Subscriber and a Publisher

Reactive Streams also includes a Technology Compatibility Kit that helps verify that implementations of Reactive Streams strictly adhere to the specification. Java 9 will also include the Reactive Streams interfaces under the new java.util.concurrent.Flow class. Reactive Streams doesn't focus on operators but instead it focuses on the stream's lifecycle and interaction with different reactive sources. Reactive Streams allows Projector Reactor and RxJava to have interoperability and flexibility. Project Reactor is Pivotal's implementation of Reactive Streams. Pivotal, which is the custodian for the Spring Framework and its various projects, is also the custodian for Project Reactor. Reactor 3 introduces a fluent and functional programming methodology for composing asynchronous and backpressure enabled event processing.

A key aspect of Project Reactor is its ability to provide backpressure to its asynchronous stream processing. Backpressure allows a consumer of an asynchronous event stream the ability to tell a producer how much data should be sent over to prevent the producer from emitting events to a consumer faster than the consumer can process them. Backpressure prevents the consumer from becoming overwhelmed with events, and then running low on system resources which could cause cascading failures in large systems. Events are pushed to a consumer only when the consumer can respond to the events as it sees fit. Project Reactor also provides other strategies for reducing the amount of data sent to a consumer, this includes using buffers and using techniques such as windowing[IE1]. Windowing is a technique that allows a program to analyze streaming data for the last n seconds every m seconds.

Project Reactor provides operators that allow transformations and terminal operations to an event stream. These operators give subscribers to a stream the ability to transform a stream to another stream by doing operations such as filtering, mapping, flat mapping, merging, concatenating, buffering, and windowing streams. These operators also allow the application of backpressure, throttling of streams, and implementing common patterns like scatter-gather for getting data from external services. Terminal operations pulls all the data in the stream and returns a result. Exceptions and errors in Project Reactor are considered first class citizens and are treated as a type of event. Treating exceptions as events avoids breaking the flow of the asynchronous event stream. Subscribers to a Stream are attached and removed on an ad hoc basis. Project Reactor follows a functional programming style which allows a coder to write declarative code that is easy to reason about, this style is like that of Java 8 Streams. As in Java 8 Streams, Project Reactor event streams are immutable, and, to operate on them, a consumer must subscribe to them. During the subscription, the consumer can provide custom thread pool executors to fine tune the concurrency of the event stream.

Handling Asynchronous Code, the traditional way

Project Reactor and Reactive Programming provide a different experience to what coders with a background in Java are used to. Project Reactor's programming paradigm is often compared to Java 8 Streams due to their similar syntax. Even though Project Reactor and Java 8 Streams share very similar API abstractions, underneath the difference is stark. Java 8 Streams deal with a collection of data, data that has a clear beginning and end, while Project Reactor Streams deal with an infinite asynchronous stream sequence that can come from external sources like another web service or database. Project Reactor also implements a "Push" event streaming model, while Java 8 Streams implement a "Pull" based model. Java 8 Streams lack operations that deal with time, such as windowing. Java 8 Streams does not support operations that deal with latency or ones that are I/O based. Both API(s) allow for the definition of a pipeline on which transformations can occur on the stream of data. This pipeline is coded using Functional-Style semantics and using Java's Lambda mechanism.

Before Java 8, implementing asynchronous code was not trivial because the API(s) available at the time were verbose, complex, and clunky. Before Java 8, constructs such as callbacks were used, but they required anonymous classes and were not easy to chain. The verbosity is limited if you use Java 8 Lambdas, but the other complexities of callbacks remain. Implementing operations like, "If this happens, then do that" is difficult because of the lack of proper chaining using callbacks. Often, the only way to chain with callbacks is to embed callbacks within each other. Embedding callbacks within each other usually leads to "Callback Hell", a situation in which it becomes impossible to make sense out of deeply nested callback code. Another reason that implementing asynchronous code was difficult before Java 8 was the use of the Future class. Future is asynchronous, and allows you to obtain results from a different thread, but calling its get() method to obtain the result blocks the calling thread. Both the Spring Framework and Google's Guava library tried to resolve this issue with the use of the ListenableFuture class. ListenableFuture added non-blocking callback based capabilities to the Future class.

Java 8 introduced Lambdas and the CompletableFuture class. Lambdas gave Java developers the ability to write concise callback code instead of using anonymous classes. CompletableFuture and the CompletionStage interface gives Java developers the ability to handle Futures in a non-blocking way and provides the capability to chain the processing of the deferred result.

Project Reactor includes the types Flux and Mono, their API is like the Java 8 Streams API, but their names differ to prevent confusion. The Flux and Mono types can easily be converted to a CompletableFuture and vice versa. Flux and Mono provide more capabilities than the Java 8 Stream API, and it can be directly compared to RXJava's Observable class.

Project Reactor - Reactive types

Project Reactor keeps its API lean and easy to use, because there are only two Reactive types. There are the Flux and Mono types. Reactive types allow applications to serve more requests concurrently. This ability allows Reactive applications to better handle use cases that involve high latency with better efficiency. Using Reactive types for potentially blocking operations minimize resource consumption because a Reactive type that waits costs nothing. Reactive types do not make data processing faster, it allows your application to handle more load.

Flux and Mono both implement the Publisher interface from the Reactive Streams specification. A Flux type represents an event stream that emits 0… N items, and a Mono type represents an event stream that emits 0..1 items. Mono is essentially the Reactive equivalent of CompletableFuture. A Mono type usually refers to a Mono that returns at most 0 items, while a Mono type returns at most 1 item of type T. Flux refers to a Flux that returns at most N items of type T. Both Flux and Mono can be passed around polymorphically since they both implement the Publisher interface from the Reactive Streams specification. Both Flux and Mono implement backpressure, in adherence to the Reactive Streams Specification. A Mono can be safely created using an Optional, and Flux support operations on time periods using Java 8's Duration class. Operations supported by Mono and Flux are provided in the links below, they point to the Flux and Mono Javadoc's.

Below are some common operations with Flux and Mono.

Returning an empty Flux:


Flux get() {
 return Flux.empty();
}

Returning a Flux with data:


Flux get() {
 return Flux.just("one", "two");
}

Returning an empty Mono:


Mono get() {
 return Mono.empty();
}

Returning a Mono with one item:


Mono get() {
 return Mono.just("one");
}

A Flux that emits an IllegalStateException:


Flux errorFlux() {
 return Flux.error(new IllegalStateException());
}

A Flux that emits increasing values from 0 to 9 each 100ms:


Flux counter() {
 return Flux.interval(Duration.ofMillis(100)).take(10);
}

A Mono that emits an IllegalStateException:


Mono errorMono() {
 return Mono.error(new IllegalStateException());
}

Using the map operator to transform a Flux:


Flux capitalizeMany(Flux flux) {
 return flux.map(user -> new User(user.getUsername().toUpperCase(),
 user.getFirstname().toUpperCase(),
 user.getLastname().toUpperCase()));
}

Create a Flux containing the value of mono1 then the value of mono2:


Flux createFluxFromMultipleMono(Mono mono1, Mono mono2) {
 return Flux.concat(mono1, mono2);
}

Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values):


Flux mergeFluxWithNoInterleave(Flux flux1, Flux flux2) {
 return Flux.concat(flux1, flux2);
}

Merge flux1 and flux2 values with interleave:


Flux mergeFluxWithInterleave(Flux flux1, Flux flux2) {
 return Flux.merge(flux1, flux2);
 }

Create a Mono ad-hoc if the Mono requested is not present:


Mono callSteveMono(Mono mono) {
 return mono.otherwise(e -> Mono.just(User.STEVE));
}

Create a Flux ad-hoc if the Flux requested is not present:


Flux callSteveAndCindyFlux(Flux flux) {
 return flux.onErrorResumeWith(e -> Flux.just(User.STEVE, User.CINDY));
 }

Return the Flux which returns faster the first value, Flux that emits first element wins:


Flux useFastestFlux(Flux flux1, Flux flux2) {
 return Flux.firstEmitting(flux1,flux2);
}

Create a Flux of user from Flux of username, firstname, and lastname. Streams could arrive at different times because of latency. This could represent 3 remote calls to an API:


Flux userFluxFromStringFlux(Flux usernameFlux, Flux firstnameFlux, Flux lastnameFlux) {
 return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux)
 .map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}

Return the Mono which returns faster its data:


Mono useFastestMono(Mono mono1, Mono mono2) {
 return Mono.first(mono1,mono2);
}

Convert the input Flux to a Mono, this represents a complete signal, signaling that the data streaming is complete:


Mono fluxCompletion(Flux flux) {
 return flux.then();
}

Adapt Mono to Java 8 CompletableFuture:


CompletableFuture fromMonoToCompletableFuture(Mono mono) {
 return mono.toFuture();
}

Adapt Java 8 CompletableFuture to Mono:


Mono fromCompletableFutureToMono(CompletableFuture future) {
 return Mono.fromFuture(future);
}

Return the user contained in that Mono. Gotcha: Don't use this blocking operation as it will kill the Reactive pipeline:


User monoToValue(Mono mono) {
 return mono.block();
}

Return the users contained in that Flux. Gotcha: Don't use this blocking operation as it will kill the Reactive pipeline:


Iterable fluxToValues(Flux flux) {
 return flux.toIterable();
}

Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed and the data is received in a different thread. Schedulers.elastic() allows data processing to occur in a different thread:


Flux blockingRepositoryToFlux(BlockingRepository repository) {
 return Flux.defer(() -> Flux.fromIterable(repository.findAll())
 .subscribeOn(Schedulers.elastic()));
}

Insert users contained in the Flux parameter in the blocking repository using a parallel scheduler and return a Mono :


Mono fluxToBlockingRepository(Flux flux, BlockingRepository repository) {
 return flux.publishOn(Schedulers.parallel())
 .doOnNext(user -> repository.save(user)).then();
}

Summary

Spring Framework 5 completely embraces Reactive Streams and uses Projector Reactor internally. Reactor is the foundation for Spring Webflux and Spring Data Commons 2.0, both which are rewritten versions of Spring Web MVC and Spring Data Commons that adhere to the Reactive Streams Specification from the ground up. Pivotal found it difficult to modify existing API(s) to support Reactive programming, and instead decided to create separate projects for that purpose. Spring Webflux and Spring Data Commons 2.0 will be updated and maintained alongside Spring Web MVC and Spring Data Commons. In subsequent articles, we will explore the implementation of a simple Spring Boot API, using Spring MVC and then using Spring Webflux[IE2].

References:

  1. https://projectreactor.io/
  2. https://projectreactor.io/docs
  3. https://github.com/reactor/reactor
  4. https://github.com/reactor/lite-rx-api-hands-on
  5. http://www.reactive-streams.org/
  6. http://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html