Declarative way of programming modern services, deals with data streams and propagation of change (event-driven).
Not neccessarily async, but its a core feature; reacting to a change doesn’t require async nature of processing, but the way modern reactive system acheive performance is using async processing.
Outline of reactive systems is in Reactive Manifesto.
Modern languages use Reactive Data Streams which are async and are specified here.
We can always use Future
or CompletableFuture
to make existing methods async, but it makes code long and redundant.
Moreover, it will lead to blocking anyways if we perform a .join()
to combine output of two futures.
CompletableFuture<String> one = CompletableFuture.supplyAsync(() -> foo()); // make foo() async
CompletableFuture<String> two = CompletableFuture.supplyAsync(() -> bar()); // make bar() async
CompletableFuture<Void> combined = CompletableFuture.allOf(one, two); // combine the two
combined.join(); // wait (block) till the results from both arrive
String a = one.join(); // get value
String b = two.join(); // get value
// join() is just like a get() method; used to get value out from streams/futures
Since Java 9 we have a Flow API (java.util.concurrent.Flow
) that standardizes the operations on reactive libraries, just like JPA for persistance tools.
In short, Reactive Streams offer a more powerful and declarative yet flexible model for handling continuous data streams and complex asynchronous data processing scenarios compared to CompletableFuture
(where we have to do everything manually).
Similar design patterns, but the only difference is who controls the data flow.
In iterator, the calling method “pulls” the data from the source collection.
stream.forEach(System.out::println); // forEach pulls data from stream
In observer, we delegate and observer to observe data source change and “react” to it. The data source is the one “pushing” (controlling) the data.
stream.myObserver(System.out::println); // myObserver merely reacts to the changes in stream
Notice that the way we write both is exactly the same, but they are opposite w.r.t the side that control the data flow.
Publisher - a reactive datasource; its subscribe()
method is called passing it a subscriber
Subscriber - onSubscribe()
, onNext()
, onError()
, onComplete()
Subscription - request(long n)
and cancel()
Processor - implements both Publisher and Subscriber; can act as both
providerObj.subscribe(new Subscriber(){
void onSubscribe(Subscription s){
s.request(3);
}
// other methods - onNext(), onComplete(), etc
}
);
As shown above, we need to subscribe to the data source first (explicit) and request n
items (implicit), only then does it sends us the data (emits) and the Observer pattern comes into play and we consume the data until a terminal signal (Error or Complete) is received.
Reactive sources/streams:
Flux
: can emit 0
to n
elements (i.e. sequence of elements)Mono
: can emit 0
or 1
elements (i.e. single element)// Flux
Flux<String> fStr = Flux.just("A", "B", "C");
Flux<Integer> fNum = Flux.range(1, 10);
// Mono
Mono<Integer> mono = Mono.just(9);
// delay of 1 sec between emission of each element
Flux<Integer> fNum = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
// unresponsive stream: never emit, observer keeps waiting infinitely
Flux.never();
Mono.never();
// we can have Collections inside Rx streams, nesting is possible too, etc...
Mono<Integer>
Mono<User> // custom POJO
Mono<List<Integer>>
Mono<Mono<Integer>>
item mono terminates, flux doesn't onNext()
complete event mono terminates, flux too onComplete()
failure event mono terminates, flux too onError()
We can perform operations on reactive streams - same as streams, we have intermediate and terminal operations.
// Terminal operations
flux.subscribe(System.out::println); // equivalent to .forEach()
flux.subscribe(System.out::println,
err -> System.out.println("Error occurred: " + err.getMessage()), // if error happens; do this
() -> System.out.println("Finished.") // on complete event, do this
);
// converting a reactive source to stream to a list
flux.toStream().toList();
// it is blocking since we will wait for all the elements from the stream to emit and then form the stream; so it's bad!
Integer i = mono.block(); // subscribe and block indefinitely until element is received; upon receive, return it
mono.block(Duration.ofSeconds(5)); // if element doesn't come in 5s, we throw error; even on complete and failure
mono.blockOptional(); // returns emitted value (if any)
// Intermediate operations
.filter()
.distinct()
.map()
.flatMap() // same as in streams; the target element is a reactive stream
.count() // returns Mono<Long>; subscribe to it inorder to take out the element
.take(n) // similar to limit(), sends a cancel() to stream to indicate a stop
.log() // logs every implicit method call
.defaultIfEmpty(-1) // outputs a flux containing -1 if input stream is empty (no elements and we recieve a complete)
// Error handling
// remember, one way is to use second param of the subscribe() to handle errors
.doOnError(Consumer) // do something on error; and then stop and throw error
.onErrorContinue(err, item) // continue from next element after doing this
.doFinally(Consumer) // only accepts SignalType as input, no elements, only complete and failure events
Several operations like count()
on a flux return a mono. We then in turn perform operations on that mono.
If we block and combine them then that’s not a good way. Instead we combine them using the provided methods so that there is no blocking and we get back a Flux
to subcribe to (keeping things async):
// static method from Flux class (waits for mono to finish before flux)
Flux<T> merged = Flux.concat(mono, flux);
// methods from flux/mono instance
// concat waits for mono to finish before emitting flux
Flux<T> merged = mono.concatWith(flux);
// merge emts messages as soon as they are available from either source (recommended)
Flux<T> merged = mono.mergeWith(flux);
We can tell the data source to slow down in case we are taking too long to process data it emits.
In web apps, Netty server controls the reactive aspects, we just use Flux or Mono everywhere in the app code and perform operations only on them.
Use Spring Reactive Web
dependency in Spring Initializr to use reactive features in Spring Boot. It also adds Project Reactor dependency too by default.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
This webflux dependency also makes the default embedded HTTP application server as Netty, rather than Tomcat.
For Reactive data sources, we can use any conventional database provider (H2, Postgres, etc) but instead of JPA we use R2DBC (Reactive Relational Database Connectivity).
Do note that R2DBC doesn’t support entity relationships (@OnetoOne, etc) as of yet, so it isn’t a full replacement for JPA.
The JPA and Driver dependency for the database provider will be slightly different than the normal ones:
<!-- for ReactiveCrudRepository<> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- normal Postgres driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- added automatically with Postgres driver once Spring Initializr detects r2dbc JPA and Postgres dependencies -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
// repo
public interface ProductRepository extends ReactiveCrudRepository<Product, Long>{ }
// controller
@GetMapping("/all")
public Flux<Product> listAllProducts(){
return productRepo.findAll();
}
Reference: Spring Reactive CRUD Project - YouTube
Now deprecated, RestTemplate
followed a thread-per-request model, serverely limiting as well as it was blocking the flow until result of the external service call was available.
Spring Web has a RestClient
for synchronous web API calls which is a replacement for the now deprecated RestTemplate
.
Additionally, Spring WebFlux introduces a non-blocking way using WebClient
. It returns a Publisher which the client (browser) subscribes to, without blocking the code flow. As and when the data is available from the “slowservice”, the client receives it async-ly.
// create client instance
WebClient client = WebClient.builder().baseUrl("localhost:8081/slowservice").build();
// call service
Flux<Tweet> tweets = client.get().retrieve().bodyToFlux(Tweet.class);
// subscribe to service output
tweets.subscribe(System.out::println);
// method exits asap without blocking
log.info("Exiting NON-BLOCKING Controller ASAP!");
// results are received and printed later