Spring Web Flux

What is WebFlux?

WebFlux is part of Spring Framework 5 and is used in Reactive Programming for Web Applications. WebFlux is built on Project Reactor, which provides an API for reactive streams. It is a non-blocking, event-driven framework, enabling high concurrency without blocking the server resources. It provides scalability by handling the requests asynchronously.

Key Features of WebFlux

  • Reactive Programming – WebFlux is designed for Reactive Programming using Mono and Flux types. It handles data streams asynchronously.
    • Mono – Represents a single value or no value.
    • Flux – Represents a stream of multiple values.
  • Non-blocking I/O – WebFlux uses Reactor Netty by default for non-blocking I/O operations. It can even work with Tomcat, Jetty, or other supported web servers.
  • Annotation-based or Functional Routing – Annotation-based is the use of traditional @Controller or @RequestMapping annotations. Functional Routing provides a more functional programming style using route functions.
  • Declarative Approach – It means that one just needs to write the declarative code that expresses how the data should flow, and the system manages the concurrency aspects like backpressure.
  • High Scalability—WebFlux is asynchronous, so it allows you to handle numerous concurrent requests efficiently with fewer threads.
  • Backpressure—WebFlux supports backpressure, which helps prevent the system from overloading when dealing with fast data producers and slow consumers.

Components of a Reactive Programming Application

Reactive Programming is different from Traditional Programming. In traditional programming we have a Controller which is annotated either with @Controller or @RestController annotation. And inside that controller we define the different methods by using the HTTP methods as @GetMapping, @PostMapping, @PutMapping, @DeleteMapping, etc.

But in Reactive Programming we don’t have any Controller. Instead of Controller, we used Router and, in that Router, we define the route functions using different HTTP Methods with their handlers like .route(GET(“/hello”), handler::getHello). From that Router it calls the respective Handler where the business logic is present. And, from there it calls the Service Interface having the implementation class, then to repository and supported by the Entity class.

Mono v/s Flux

FeatureMonoFlux
No. of items0 or 1o to N
Common use casesSingle response (fetching a single object, sending one HTTP response)Multiple responses (querying a list, event streams)
ExampleMono<String>Flux<String>
Creaton MethodsMono.just(value), Mono.empty(), Mono.error()Flux.just(), Flux.fromIterable(), Flux.range()

Different Methods used in Reactive Programming and their purposes.

  • route() – This route function is a declarative way of defining routes. It contains the HTTP requests with the respective handlers. It allows you to define a series of handler functions that process incoming requests. It is more aligned with the principles of reactive programming and functional programming.

Example – route(POST(“/flux/processTrackers”), springReactiveWebFluxHandler::createReportProcessTracker)
.andRoute(GET(“/flux/processTrackers”), springReactiveWebFluxHandler::getAllReportProcessTrackers)

  • bodyToMono() – This method is used to read the body of an HTTP request in a non-blocking manner and convert it into a Mono object, which represents an asynchronous computation that either produces a single value (or no value) or an error.

Example – return serverRequest.bodyToMono(ReportProcessTrackerDTO.class)
.flatMap(reportProcessTrackerDTO -> springReactiveWebFluxService.saveReportProcessTracker(reportProcessTrackerDTO))
.flatMap(savedReportProcessTracker -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(fromValue(savedReportProcessTracker)));

In the above example you can see that bodyToMono() is returning a Mono object of ReportProcessTrackerDTO class after reading the body of an HTTP request.

  • Mono.just() – It is a method that is used to create a Mono that immediately emits the provided value and then completes. It’s a convenient way to create a Mono from existing values or objects in a reactive programming context.
public static <T> Mono<T> just(T data)
  1. T – The value to be emitted.
  2. data – The actual value to emit.
  3. Return – A Mono<T> that emits the value data and then completes.

Example

Mono<String> mono = Mono.just("Hello, World!");

In the above line of code, a Mono is created that will emit the string “Hello World!” and then complete.

  • Flux.just() – This method creates a Flux which immediately emits the provided items and then completes. It is commonly used to create a Flux from a fixed set of items or values in a reactive programming concept.

Example

public static <T> Flux<T> just(T... data)
  1. T – the type of the values to be emitted.
  2. data – The values that the Flux will emit in the order they are provided.
  3. return – A Flux<T> that emits the value and then completes.
Flux<Integer> fluxInteger = Flux.just(1, 2, 3, 4, 5);
Flux<String> fluxString = Flux.just("Apple", "Banana", "Cherry");

In the above code, 1st line creates a Flux of Integer and emits 1, 2, 3, 4, 5 one after another.

Whereas, 2nd line creates a Flux of String and emits the value Apple, Banana, Cherry.

  • map() – This method is used to transform each item emitted by the source Mono or Flux and transform into a new value.

map() in Mono

Mono<String> mono = Mono.just("Hello")
    .map(str -> str + ", World!");  // Transformation function

In the above code, the explanations are provided below:

  1. Mono.just(“Hello”); – It will create a Mono object that emits the value Hello.
  2. map() – It takes the emitted value Hello after that it applies the transformation (str -> str + “World!”); and returns the new value Hello World!
  3. The resulting Mono will now emit the value Hello World! when subscribed to.

map() in Flux

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5)
    .map(num -> num * 10);

In the above code, the explanations are provided below:

  1. First, it creates a Flux of Integer and emits the values 1, 2, 3, 4, and 5.
  2. Then, it transforms the above values multiplied by 10 and emits the values 10, 20, 30, 40, 50, and completes.
  3. Remember map() function works in synchronous mode, that means in the above example it will not complete until and unless it emits the value of the above list multiplied by 10.
  • flatMap() – This method is used to transform each emitted item from Mono or Flux asynchronously into another Mono or Flux. This is the big difference in comparison to map() function. It is ideal for non-blocking, asynchronous operations such as calling external services, performing database queries, and making HTTP requests.

flatMap() in Mono

Mono<Integer> mono = Mono.just("3")
    .flatMap(str -> Mono.just(Integer.parseInt(str));

In the above code, the explanations are provided below:

  1. First, it emits the value 3 as String.
  2. Then the flatMap() takes the value as a String, and convert it to an another Mono of Integer by using Mono.just(Integer.parseInt(str)) which emits the value 3 as Integer.

So, in the case, of flatMap() it is used as the transformation function which itself returned a Mono where the inner Mono is unwrapped, and the outer Mono emits the result of the inner Mono.

Example – Mono.flatMap()

Mono<ReportProcessTracker> reportProcessTrackerMono = springReactiveWebFluxProcessTrackerRepository.findByTraceId(traceId);
        return reportProcessTrackerMono.flatMap(reportProcessTracker -> convertToReportProcessTrackerDTO(reportProcessTracker));

The above code returns the Mono of ReportProcessTracker entity by traceId and then converts that entity into a Mono of ReportProcessTrackerDTO.

flatMap() in Flux

Flux<Integer> flux = Flux.just("7", "8", "9")
                .flatMap(str -> Flux.just(Integer.parseInt(str) * 3));

In the above code, the explanations are provided below:

  1. First, it emits the strings as 7, 8, and 9.
  2. Second, the flatMap() function transforms each string into the corresponding Integer multiplied by 3 as 21, 24, and 27.
  3. Lastly, the Flux<Integer> emits 21, 24, and 27 one after another.

Example – Flux.flatMap()

Flux<ReportProcessTracker> reportProcessTrackerFlux = springReactiveWebFluxProcessTrackerRepository.findAll();
Flux<ReportProcessTrackerDTO> reportProcessTrackerDTOFlux = reportProcessTrackerFlux
                .flatMap(reportProcessTracker -> convertToReportProcessTrackerDTO(reportProcessTracker));

The above code returns that Flux of ReportProcessTracker entity by using the findAll() and then converts each ReportProcessTracker entity into a ReportProcessTrackerDTO and then returns the Flux of ReportProcessTrackerDTO.

Difference between map() and flatMap()

Featuremap()flatMap()
Synchronous vs AsynchronousTransforms the data synchronously in the same reactive type i.e. in the same Mono or Flux Transforms the data asynchronously by mapping into another Mono or FLux
Use CaseFor simple, synchronous transformations (modifying data locally)For non-blocking operations that involve another reactive source, like a database query, API call, or I/O operation
  • flatMapSequential() – This method is used to return a new reactive stream (Mono or Flux) that transforms the elements emitted by a Mono or Flux and then merges the resulting order, preserving the original sequence of the source.
Flux<Integer> flux = Flux.just("7", "8", "9")
                .flatMap(str -> Flux.just(Integer.parseInt(str) * 3));

In the above code, there is no guarantee that the Flux<Integer> will return the data in the order of 21, 24, 27. It can return the data in any order based on which inner stream emits first and as it is asynchronous.

But in the case of flatMapSequential() it guarantees ordering. It will emit the result in the same order as it has received from the input/source stream.

Flux<Integer> flux = Flux.just("7", "8", "9")
                .flatMapSequentials(str -> Flux.just(Integer.parseInt(str) * 3));

The above lines of code will return the result in the same order as you are expecting i.e. 21, 24, 27.

Example of flatMapSequential()

getAllReportProcessTrackers()
                .flatMapSequential(reportProcessTrackerDTO ->
                        findReportProcessAndFileTrackerByProcessId(reportProcessTrackerDTO.getTraceId()))

It will return all the ReportProcessTrackers DTO and within one ReportProcessTrackerDTO it will return the associated ReportFileTrackersDTO based on their ID in ascending order.

Syntax

flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper);

mapper – A functional that takes an item emitted by the source and returns a Mono or Flux (i.e. a Publisher)

This is another syntax which is Optional

flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int concurrency, int prefetch);

concurrency – Limits the no. of concurrent inner streams that are processed.

prefetch – Controls how many elements are pre-fetched i.e. how many items can be requested in advance.

Use case of flatMapSequential()

  1. It is very helpful when you want to show data in a sequential order which can be sorted by descending order of updatedTimestamp or ascending order of ID, or descending order of createdTimestamp, etc.
  2. In the case of inserting records into a database in a specific sequence.

Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *