What is WebFlux?
WebFlux is part of Spring Framework 5 and is used in Reactive Programming for Web Applications. WebFlux is built on Project Reactor, which provides an API for reactive streams. It is a non-blocking, event-driven framework, enabling high concurrency without blocking the server resources. It provides scalability by handling the requests asynchronously.
Key Features of WebFlux
- Reactive Programming – WebFlux is designed for Reactive Programming using Mono and Flux types. It handles data streams asynchronously.
- Mono – Represents a single value or no value.
- Flux – Represents a stream of multiple values.
- Non-blocking I/O – WebFlux uses Reactor Netty by default for non-blocking I/O operations. It can even work with Tomcat, Jetty, or other supported web servers.
- Annotation-based or Functional Routing – Annotation-based is the use of traditional @Controller or @RequestMapping annotations. Functional Routing provides a more functional programming style using route functions.
- Declarative Approach – It means that one just needs to write the declarative code that expresses how the data should flow, and the system manages the concurrency aspects like backpressure.
- High Scalability—WebFlux is asynchronous, allowing you to handle concurrent requests efficiently with fewer threads.
- Backpressure—WebFlux supports backpressure, which helps prevent the system from overloading when dealing with fast data producers and slow consumers.
Components of a Reactive Programming Application
Reactive Programming is different from Traditional Programming. In traditional programming, we have a Controller which is annotated either with @Controller or @RestController annotation. Inside that controller, we define the different methods by using the HTTP methods @GetMapping, @PostMapping, @PutMapping, @DeleteMapping, etc.
But in Reactive Programming, we don’t have any Controller. Instead of Controller, we used Router, and in that Router, we defined the route functions using different HTTP Methods with their handlers like .route(GET(“/hello”), handler::getHello). From that Router, it calls the respective Handler where the business logic is present. And, from there it calls the Service Interface having the implementation class, then to repository and supported by the Entity class.
Mono v/s Flux
Feature | Mono | Flux |
No. of items | 0 or 1 | 0 to N |
Common use cases | Single response (fetching a single object, sending one HTTP response) | Multiple responses (querying a list, event streams) |
Example | Mono<String> | Flux<String> |
Creaton Methods | Mono.just(value), Mono.empty(), Mono.error() | Flux.just(), Flux.fromIterable(), Flux.range() |
Different Methods used in Reactive Programming and their purposes.
- route() – This route function is a declarative way of defining routes. It contains the HTTP requests with the respective handlers. It allows you to define a series of handler functions that process incoming requests. It is more aligned with the principles of reactive programming and functional programming.
Example – route(POST(“/flux/processTrackers”), springReactiveWebFluxHandler::createReportProcessTracker)
.andRoute(GET(“/flux/processTrackers”), springReactiveWebFluxHandler::getAllReportProcessTrackers)
- bodyToMono() – This method is used to read the body of an HTTP request in a non-blocking manner and convert it into a Mono object, which represents an asynchronous computation that either produces a single value (or no value) or an error.
Example – return serverRequest.bodyToMono(ReportProcessTrackerDTO.class)
.flatMap(reportProcessTrackerDTO -> springReactiveWebFluxService.saveReportProcessTracker(reportProcessTrackerDTO))
.flatMap(savedReportProcessTracker -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(fromValue(savedReportProcessTracker)));
In the above example you can see that bodyToMono() is returning a Mono object of ReportProcessTrackerDTO class after reading the body of an HTTP request.
- Mono.just() – It is a method that is used to create a Mono that immediately emits the provided value and then completes. It’s a convenient way to create a Mono from existing values or objects in a reactive programming context.
public static <T> Mono<T> just(T data)
- T – The value to be emitted.
- data – The actual value to emit.
- Return – A Mono<T> that emits the value data and then completes.
Example
Mono<String> mono = Mono.just("Hello, World!");
In the above line of code, a Mono is created that will emit the string “Hello World!” and then complete.
- Flux.just() – This method creates a Flux which immediately emits the provided items and then completes. It is commonly used to create a Flux from a fixed set of items or values in a reactive programming concept.
Example
public static <T> Flux<T> just(T... data)
- T – the type of the values to be emitted.
- data – The values that the Flux will emit in the order they are provided.
- return – A Flux<T> that emits the value and then completes.
Flux<Integer> fluxInteger = Flux.just(1, 2, 3, 4, 5);
Flux<String> fluxString = Flux.just("Apple", "Banana", "Cherry");
In the above code, 1st line creates a Flux of Integer and emits 1, 2, 3, 4, 5 one after another.
Whereas, 2nd line creates a Flux of String and emits the value Apple, Banana, Cherry.
- map() – This method is used to transform each item emitted by the source Mono or Flux and transform into a new value.
map() in Mono
Mono<String> mono = Mono.just("Hello")
.map(str -> str + ", World!"); // Transformation function
In the above code, the explanations are provided below:
- Mono.just(“Hello”); – It will create a Mono object that emits the value Hello.
- map() – It takes the emitted value Hello after that it applies the transformation (str -> str + “World!”); and returns the new value Hello World!
- The resulting Mono will now emit the value Hello World! when subscribed to.
map() in Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5)
.map(num -> num * 10);
In the above code, the explanations are provided below:
- First, it creates a Flux of Integer and emits the values 1, 2, 3, 4, and 5.
- Then, it transforms the above values multiplied by 10 and emits the values 10, 20, 30, 40, 50, and completes.
- Remember map() function works in synchronous mode, that means in the above example it will not complete until and unless it emits the value of the above list multiplied by 10.
- flatMap() – This method is used to transform each emitted item from Mono or Flux asynchronously into another Mono or Flux. This is the big difference in comparison to map() function. It is ideal for non-blocking, asynchronous operations such as calling external services, performing database queries, and making HTTP requests.
flatMap() in Mono
Mono<Integer> mono = Mono.just("3")
.flatMap(str -> Mono.just(Integer.parseInt(str));
In the above code, the explanations are provided below:
- First, it emits the value 3 as String.
- Then the flatMap() takes the value as a String, and convert it to an another Mono of Integer by using Mono.just(Integer.parseInt(str)) which emits the value 3 as Integer.
So, in the case, of flatMap() it is used as the transformation function which itself returned a Mono where the inner Mono is unwrapped, and the outer Mono emits the result of the inner Mono.
Example – Mono.flatMap()
Mono<ReportProcessTracker> reportProcessTrackerMono = springReactiveWebFluxProcessTrackerRepository.findByTraceId(traceId);
return reportProcessTrackerMono.flatMap(reportProcessTracker -> convertToReportProcessTrackerDTO(reportProcessTracker));
The above code returns the Mono of ReportProcessTracker entity by traceId and then converts that entity into a Mono of ReportProcessTrackerDTO.
flatMap() in Flux
Flux<Integer> flux = Flux.just("7", "8", "9")
.flatMap(str -> Flux.just(Integer.parseInt(str) * 3));
In the above code, the explanations are provided below:
- First, it emits the strings as 7, 8, and 9.
- Second, the flatMap() function transforms each string into the corresponding Integer multiplied by 3 as 21, 24, and 27.
- Lastly, the Flux<Integer> emits 21, 24, and 27 one after another.
Example – Flux.flatMap()
Flux<ReportProcessTracker> reportProcessTrackerFlux = springReactiveWebFluxProcessTrackerRepository.findAll();
Flux<ReportProcessTrackerDTO> reportProcessTrackerDTOFlux = reportProcessTrackerFlux
.flatMap(reportProcessTracker -> convertToReportProcessTrackerDTO(reportProcessTracker));
The above code returns that Flux of ReportProcessTracker entity by using the findAll() and then converts each ReportProcessTracker entity into a ReportProcessTrackerDTO and then returns the Flux of ReportProcessTrackerDTO.
Difference between map() and flatMap()
Feature | map() | flatMap() |
Synchronous vs Asynchronous | Transforms the data synchronously in the same reactive type i.e. in the same Mono or Flux | Transforms the data asynchronously by mapping into another Mono or FLux |
Use Case | For simple, synchronous transformations (modifying data locally) | For non-blocking operations that involve another reactive source, like a database query, API call, or I/O operation |
- flatMapSequential() – This method is used to return a new reactive stream (Mono or Flux) that transforms the elements emitted by a Mono or Flux and then merges the resulting order, preserving the original sequence of the source.
Flux<Integer> flux = Flux.just("7", "8", "9")
.flatMap(str -> Flux.just(Integer.parseInt(str) * 3));
In the above code, there is no guarantee that the Flux<Integer> will return the data in the order of 21, 24, 27. It can return the data in any order based on which inner stream emits first and as it is asynchronous.
But in the case of flatMapSequential() it guarantees ordering. It will emit the result in the same order as it has received from the input/source stream.
Flux<Integer> flux = Flux.just("7", "8", "9")
.flatMapSequentials(str -> Flux.just(Integer.parseInt(str) * 3));
The above lines of code will return the result in the same order as you are expecting i.e. 21, 24, 27.
Example of flatMapSequential()
getAllReportProcessTrackers()
.flatMapSequential(reportProcessTrackerDTO ->
findReportProcessAndFileTrackerByProcessId(reportProcessTrackerDTO.getTraceId()))
It will return all the ReportProcessTrackers DTO and within one ReportProcessTrackerDTO it will return the associated ReportFileTrackersDTO based on their ID in ascending order.
Syntax
flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper);
mapper – A functional that takes an item emitted by the source and returns a Mono or Flux (i.e. a Publisher)
This is another syntax which is Optional
flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int concurrency, int prefetch);
concurrency – Limits the no. of concurrent inner streams that are processed.
prefetch – Controls how many elements are pre-fetched i.e. how many items can be requested in advance.
Use case of flatMapSequential()
- It is very helpful when you want to show data in a sequential order which can be sorted by descending order of updatedTimestamp or ascending order of ID, or descending order of createdTimestamp, etc.
- In the case of inserting records into a database in a specific sequence.
- then() – This method in Project Reactor, used to define an action that occurs after the completion of Mono or Flux, but without passing along the original data. Essentially, it allows ignoring, the emitted data from the source reactive type and continue with another Mono or Flux.
How then() works
- The source Mono or Flux emits its values
- The then() method is called.
- It discards the original emitted data but waits for the source reactive stream to complete.
- Once the original stream completes, it proceeds with the next reactive stream which is usually a Mono.
then() in Mono
public final <V> Mono<V> then(Mono<V> other)
Mono<User> savedUser = userRepository.save(user); // Returns Mono<User>
Mono<Void> result = savedUser
.then(Mono.fromRunnable(() -> System.out.println("User saved successfully")));
In the above code, the explanation are provided below:
- savedUser emits a User object after saving.
- then() method ignores the savedUser object and instead waits for the Mono to complete.
- After savedUser completes, the Mono<Void> created by then() executes the print statement.
Example of Mono.then()
BigInteger id = BigInteger.valueOf(Long.parseLong(serverRequest.pathVariable("id")));
LOG.info("SpringReactiveWebFluxHandler -> deleteReportFileTrackerByID() -> ID : {}", id);
return springReactiveWebFluxService.deleteReportFileTracker(id)
.then(ServerResponse.noContent().build());
The above code will delete the ReportFileTracker by the pathVariable id.
then() in Flux
public final Mono<Void> then()
Flux<String> flux = Flux.just("A", "B", "C");
Mono<Void> result = flux
.then(Mono.fromRunnable(() -> System.out.println("Flux completed")));
- Here the Flux emits A, B & C.
- The then() method waits for the Flux to complete, and once the stream completes, it discards the emitted flux and runs the next operation which executes the print statement.
Example of Flux.then()
Flux<String> flux = Flux.just("Process1", "Process2", "Process3");
Mono<Void> finalOperation = flux
.then(Mono.fromRunnable(() -> LOG.info("All items processed")));
In the above code, it returns a flux object with values as Process1, Prcess2 & Process3. But the then() method ignores the above flux object and logs the statement as All items processed
Good Use Case of Asynchronous Handling of then()
Mono<Void> processFile = readFile("data.txt") // Returns Mono<String>
.then(writeToFile("output.txt")); // Returns Mono<Void>
The above code reads the file data.txt and ignoring that stream response the then() method starts to write the file in output.txt.
- switchIfEmpty() – This method provides an alternative reactive sequence when the original Mono or Flux completes without emitting any data (i.e. if it is empty). It acts like a fallback mechanism that switchers to another Mono or Flux when the source doesn’t emit any elements.
switchIfEmpty() in Mono
public final Mono<T> switchIfEmpty(Publisher<? extends T> alternate)
Mono<String> source = Mono.empty(); // Original Mono is empty
Mono<String> result = source
.switchIfEmpty(Mono.just("Default Value"))
.doOnNext(System.out::println);
result.subscribe();
PFB the above code explanation
- source is empty
- switchIfEmpty() comes into play and switches to the Mono.just(“Default value”)
Example of Mono.switchIfEmpty()
springReactiveWebFluxService.findReportProcessAndFileTrackerByProcessId(serverRequest.pathVariable("traceId"))
.flatMap(reportProcessFileTrackerDTO -> ServerResponse.ok().bodyValue(reportProcessFileTrackerDTO))
.switchIfEmpty(ServerResponse.notFound().build());
The above code explanation:
- First, call returns the Mono of Mono<ReportProcessFileTrackerDTO> w.r.t traceId.
- If is not returned as expected and if it is Empty then it will return ServerResponse.notFound().build() which is 404.
switchIfEmpty() in Flux
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
Flux<String> source = Flux.empty(); // Original Flux is empty
Flux<String> result = source
.switchIfEmpty(Flux.just("Fallback 1", "Fallback 2"))
.doOnNext(System.out::println);
result.subscribe();
PFB the above code explanation
- source Flux is empty
- As it is empty so the switchIfEmpty fallback is called
- And it prints Fallback 1 & Fallback 2.
Example of Flux.switchIfEmpty()
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(springReactiveWebFluxService.getHistoricalRecords(), ReportProcessFileTrackerDTO.class)
.switchIfEmpty(ServerResponse.notFound().build())
The above code explanation:
- It tries to retrieve the Flux<ReportProcessFileTrackerDTO>
- If it is empty then switchIfEmpty() method is called, which is a fallback method.
- That fallback method returns the ServerResponse.notFound().build() which will return the HTTP status code as 404.
- empty() – This method creates an empty Mono object that completes without emitting any value.
empty() in Mono
Mono<String> mono = Mono.empty();
mono.subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));
PFB the description of the above code
- 1st line creates a Mono that completes without emitting any value, essentially represents an empty reactive stream.
- The subscribe() method is used to consume events emitted by the Mono.
- In the subscribe() method there are 3 parameters mentioned as follows:
- 1st parameter System.out::println. It will execute if the above mono object emits any value. And as in above the mono is empty so it will not execute.
- 2nd parameter System.err::println. It will execute if the above mono emits any error. So as mono is empty, it will also not execute.
- 3rd parameter () -> System.out.println(“Completed”);. A runnable will execute when the mono completes successfully whether it emits or not emits any value. So finally this statement will be executed.
- error(Throwable) – This method creates a Mono that terminates with an error.
error(Throwable) in Mono
Mono.error(new RuntimeException("Something fishy!"))
.subscribe(System.out::println, System.err::println);
PFB the code explanation
- This creates a Mono that terminates with an error. The error is a RuntimeException with the message “Something fishy”.
- In the subscribe() method you have passed 2 parameters as System.out::println & System.err::println.
- As the above Mono emits an error so the 2nd statement will execute and print the statement Something fishy as error in console.
- subscribe()—This method is a core concept in Project Reactor and initiates the processing of a reactive pipeline. Both Mono and Flux are cold streams; they do not execute until they are subscribed to.
There are 5 varieties of subscribe()
- subscribe() – Triggers the execution without handlers.
Flux.just("A", "B", "C").subscribe(); // Pipeline executes, but no output is handled
- subscribe(Consumer<? super T>) – Handles emitted item (onNext)
Flux.just("A", "B", "C").subscribe(System.out::println); // It will print A, B & C in new line
- subscribe(Consumer<? super T>, Consumer<? super Throwable>) – Handles both emitted items and errors.
Flux.just(1, 2, 0).map(i -> 10 / i).subscribe(System.out::println, // onNext
error -> System.err.println("Error: " + error.getMessage())); // onError
// It will print 10 5 and Error: / by zero in new lines - subscribe(Consumer<? super T>, Consumer<? super Throwable>, Runnable) – Handles emitted items, errors and completed signals.
Flux.just("A", "B", "C").subscribe(System.out::println, // onNext
error -> System.err.println("Error: " + error.getMessage()), // onError
() -> System.out.println("Completed!")); // onComplete
// It will print A B C Completed in new lines. - subscribe(Consumer<? super T>, Consumer<? super Throwable>, Runnable, Consumer<? super Subscription>) – Handles emitted items, errors, completion, and subscription logic (like backpressure).
Flux.range(1, 10).subscribe(System.out::println, // onNext
error -> System.err.println("Error: " + error.getMessage()), // onError
() -> System.out.println("Completed!"), // onComplete
subscription -> subscription.request(5)); // onSubscribe (requesting 5 items)
// It will print 1 2 3 4 5 Completed in new lines
subscribe() in Mono
Mono.just("Hello, Mono!")
.subscribe(System.out::println);
PFB the code explanation.
- Mono emits 0 or 1 object at a time.
- Hence, in the case of Mono, we can use the subscribe(Consumer<? super T>)
- It will print “Hello, Mono!”
subscribe() in Mono for Error Scenario
Mono.error(new RuntimeException("Mono Error!"))
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
PFB the code explanation
- The above Mono is emitting an error of Runtime Exception with message as Mono Error!
- As the above Mono is emitting an error so it will print Mono Error! in the console in red color.
subscribe() in Mono for Completion Scenario
Mono.empty()
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);
PFB the code explanation.
- The above Mono is empty means it is not emitting a signal whether success or error.
- Hence in this case it will print Completed in the console.
subscribe() in Flux
Flux.just("A", "B", "C")
.subscribe(System.out::println);
PFB the code explanation.
- The above code creates a Flux of String and emits the value.
- Once it emits the value it calls the subscribe() method and it prints the values A B C in new lines.
subscribe() in Flux for Error scenario
Flux.range(1, 5)
.map(i -> {
if (i == 3) throw new RuntimeException("Error on 3!");
return i;
})
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
PFB the code explanation
- The above code will create a Flux from range 1 to 5. But when the value is 3 then it is throwing a RuntimeException of message Error on 3!
- When the above Flux will emits 1 and 2 then it will print both the values in new line in the console.
- But when the values is 3 then it will emit an error Flux which on calling the subscribe() method will print as Error on 3! in the console.
subscribe() in Flux for Completion scenario
Flux.range(1, 5)
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Stream Completed!")
);
PFB the code explanation
- In the above code, it is creating a Flux from range 1 to 5.
- As the above Flux is not emitting any error condition so on calling the subscribe() method it will print 1 to 5 in a new line in the console.
- And, at last, it will print the Stream Completed! in a new line in the console.
- As the last argument in this 3 parameterized subscribe() method is Runnable so it will execute even if the above Flux emits value or not.
subscribe() in Flux for Backpressure scenario
Flux.range(1, 10)
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!"),
subscription -> subscription.request(3)
);
PFB the code explanation
- In the above code, it will create a Flux from 1 to 10.
- As there is no error condition so, the above Flux will not emit any error condition.
- But as the above Flux is emitting the value so on calling the subscribe() method it will print 1 2 3.
- But after that, a backpressure happened so it will not emit any value for the Flux.
- So the completion parameter will be called and it will print Completed! in the new line after 3 and the process will stop.
- Why as the last line said, I want request to be executed till 3 then completion and then stop.
- fromCallable(Supplier<T>) – This method will create a Mono object from a synchronous blocking call.
fromCallable() in Mono
Mono<String> mono = Mono.fromCallable(() -> "Blocking data");
mono.subscribe(System.out::println);
PFB the code explanation
- 1st line creates a Mono that wraps a blocking operation, the Callable implementation.
- Lambda () -> “Blocking data” acts as a Callable, and it is executed only when the Mono is subscribed.
- Callable returns the string Blocking data
- On calling the subscribe() it emits the result of the Callable (“Blocking data”) to the subscriber.
- System.out::println consumes the emitted value and prints it to the console.
- fromIterable(Iterable<T>) – This method is used by the Flux. It creates a Flux from Iterable.
Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1, 2, 3));
flux.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);
PFB the code explanation
- In the 1st line, Flux.fromIterable(Arrays.asList(1, 2, 3)) creates a Flux from Iterable, a list of Integers 1 2 3.
- Each element of the list is emitted as a separate item in Flux’s data streams
- The data stream will emit the numbers 1, 2 & 3 sequentially, and then complete
- The 2nd line is subscribing to Flux
- onSubscribe: A subscription is created between the publisher (Flux) and the subscriber.
- onNext: Each item from the Flux is passed to the subscriber. In this case:
- First, 1 is passed to the System.out::println
- Second, 2 is passed to the System.out::println
- Third, 3 is passed to the System.out::println
- onComplete: After all elements are emitted, the Flux sends a completion signal.
- onError: If an error occurs, it sends an error signal instead of completion.
Output:
Received: 1
Received: 2
Received: 3
Completed!
- concatWith(Publisher<T>) – This method is also used by Flux. It concatenates another Publisher to the current sequence.
Flux.just("A", "B")
.concatWith(Flux.just("C", "D"))
.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream Completed!") // onComplete
);
PFB the code explanation
- 1st line creates a Flux by using Flux.just(“A”, “B”) and it emits the element sequentially A and B.
- 2nd line it calls concatWith(Flux.just(“C”, “D”)), the concatWith() method concatenates another Publisher, in this case, another Flux to current Flux.
- After the current Flux (“A” “B“) completes, the second Flux (“C“, “D“) is subscribed to, and its elements are emitted. By using concatWith() method the orders are preserved.
- In detailed explanation:
- onNext:
- The 1st Flux (“A“, “B“) emits its elements sequentially
- After the 1st Flux completes, the 2nd Flux (“C“, “D“) emits its elements sequentially.
- onComplete:
- After all elements from both Flux instances have been emitted, the stream sends a completion signal.
- onError:
- If any error occurs, it would terminate the stream and invoke an error handler.
- onNext:
Output
Received: A
Received: B
Received: C
Received: D
Stream Completed!
- mergeWith(Publisher<T>) – This method is also used in the concept of Flux. It merges another Publisher in the current sequence.
Flux.just("A", "B")
.concatWith(Flux.just("C", "D"))
.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream Completed!") // onComplete
);
PFB the code explanation
- 1st line creates a Flux by using Flux.just(“A”, “B”) and it emits the element sequentially A and B.
- 2nd line it calls mergeWith(Flux.just(“C”, “D”)), the mergeWith() method merges another Publisher, in this case, another Flux to the current Flux.
- In case, of mergeWith no ordering was guaranteed. It doesn’t wait for the first Flux to complete before emitting items from the second. Instead, emissions happen concurrently.
- In detailed explanation:
- onNext:
- Both Flux sources emit their items concurrently.
- Items are emitted as soon as they are ready, and the subscriber processes them.
- onComplete:
- After both Flux sources are complete, the merged Flux sends a completion signal.
- onError:
- If any of the sources emit an error, the merged Flux propagates the error and terminates.
- onNext:
Output
Received: A
Received: C
Received: B
Received: D
Stream Completed!
Difference between concatWith() and mergeWith()
Aspect | concatWith | mergeWith |
Order | Strict sequential order is enforced | Items may interleave. |
Concurrency | The second source starts after the first source is completed. | Sources emit concurrently. |
Use Case | Sequential processing of multiple streams | Parallel processing of multiple streams |
- zipWith(Publisher<T>) – This method is also associated with Flux. It combines two streams element by element.
Flux.just("A", "B", "C") // Extra element "C"
.zipWith(Flux.just("1", "2"))
.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream Completed!") // onComplete
);
PFB the code explanation
- 1st line will create a Flux that emits the elements A, B & C sequentially. Here, C is the extra element.
- In the 2nd line, it is using the zipWith() with another Flux which is emitting elements 1 and 2. This method will combine A with 1 and B with 2.
- It pairs the elements from both streams into a Tuple2 object.
- Zipping stops when one of the streams completes. As in the above example:
- 1st A from the 1st Flux is paired with 1 of the 2nd Flux.
- 2nd B from the 1st Flux is paired with 2 of the 2nd Flux.
- Extra elements are ignored like in this case C from the 1st Flux.
- In detailed explanation:
- onNext:
- Emits a combined element. As [A, 1]
- onComplete:
- Signals completion after the shorter source stream completes.
- onError:
- Emits an error if any source stream encounters one.
- onNext:
Output
Received: [A, 1]
Received: [B, 2]
Stream Completed!
Comparison with other methods
Method | Description | Use Case |
zipWith | Combines two streams element-by-element into pairs. | Paired output from two streams. |
concatWith | Appends elements from the second stream after the first completes. | Sequential processing of two streams. |
mergeWith | Interleaves emissions from both streams concurrently. | Parallel processing of two streams. |
- filter(Predicate<T>) – Filters the element based on the condition from the source Flux.
Flux.range(1, 10)
.filter(i -> i % 2 != 0)
.map(i -> i * i)
.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);
PFB the code explanation
- 1st line creates the Flux from range 1 to 10.
- After that, it applies a filter to get the odd numbers from the range 1 to 10 which are 1, 3, 5, 7, 9.
- Once filtered out the odd numbers the map() function does the square of those odd nos. and lastly subscribing the Flux prints those square odd nos.
- In detailed explanation:
- onNext:10
- Flux.range(1, 10) emits numbers from 1 to 10.
- The filter operator executes and it allows only the odd nos to pass through
- The filtered numbers (1, 3, 5, 7, 9) are passed to the subscriber.
- Then the map() function squares the odd numbers and pass to the subscriber.
- onComplete:
- After the last number in the range (10) is processed, the Flux sends a completion signal to indicate that there are no more items to emit.
- onError:
- If an error occurs then the Flux sends an error signal.
- onNext:10
Output
Received: 1
Received: 9
Received: 25
Received: 49
Received: 81
Completed!
- take(long n) – This method is used to limit the no. of elements emitted.
Flux.range(1, 20)
.filter(i -> i % 2 == 0) // Filter even numbers
.take(5) // Take the first 5
.subscribe(System.out::println);
PFB the code explanation
- 1st line defines a Flux of range 1 to 20.
- 2nd line uses the filter operator to emit the even numbers (2, 4, 6, 8, 10, 12, 14, 16, 18, 20) from the range 1 to 20.
- In the 3rd line, it calls the take(5) where it passes a value in that method as 5 which means that it will consider the first 5 elements as emitted by the filter operator and calls the subscribe method to print the first 5 even numbers as 2, 4, 6, 8, 10.
- In detailed description
- onNext:
- The Flux emits the integers sequentially.
- The take(5) operator ensures that only the first 5 elements are passed downstream to the subscriber.
- onComplete:
- After emitting the 5th item, take(5) operator signals completion to the subscriber.
- No further items are emitted, even though the Flux was originally configured to emit 10 items based on the filter operator.
- onError:
- If any error occurs during the emission or processing, an error signal is sent, and the Flux terminates early.
- onNext:
Output
2
4
6
8
10
Comparison with other operators
Operator | Description | Example Output (1 to 10) |
take(n) | Emits the first n items and then completes | 1, 2, 3, 4, 5 |
skip(n) | Skips the first n items and emits the rest | 6, 7, 8, 9, 10 |
takeLast(n) | Emits the last n items of the stream | 6, 7, 8, 9, 10 |
takeWhile(p) | Emits the item when the predicate is true and then completes | 1, 2, 3, 4, 5 for (p <= 5) |
- onErrorResume(Function<Throwable, Publisher<? extends T>>) – This method is used to provide a fallback stream in case of errors.
Flux.error(new IllegalStateException("Illegal State"))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Flux.just("Recovered from IllegalStateException");
} else {
return Flux.error(e); // Rethrow the error for unknown types
}
})
.subscribe(System.out::println);
PFB the code explanation
- Flux.error(new IllegalStateException(“Illegal State”)) – It creates a Flux that immediately emits an error signal (onError) with the specified exception: IllegalStateException(“Illegal State”)
- onErrorResume(e -> ) – It checks whether the error signal that gets emitted from the 1st line is of type IllegalStateException or not, if it is of type IllegalStateException then it creates a Flux.just(“Recovered from IllegalStateException”) to replace the errored stream.
- The function takes the emitted error (e), and instead of propagating the error downstream, it switches to the fallback Flux that emits Recovered from IllegalStateException.
- In detailed description
- Error handling:
- The Flux immediately emits an error IllegalStateException(“Illegal State”) and invokes the error-handling logic in onErrorResume.
- Fallback Stream;
- The fallback Flux.just(“Recovered from IllegalStateException”) is activated by onErrorResume, and it emits the “Recovered from IllegalStateException”
- Completion:
- The fallback Flux emits its value (“Recovered from IllegalStateException”) and then sends a completion signal (onComplete)
- Error handling:
Output
Recovered from IllegalStateException
Key features of onErrorResume()
- Error Recovery:
- Provides a way to recover from errors by replacing the failed stream with an alternative.
- Dynamic Handling:
- Error handling can be based on the type of error or its message.
- Stream Continuity:
- Ensures that the stream does not terminate abruptly and can provide meaningful output or alternative data.
Comparison with other Error Handling Operators
Operator | Description | Use Case |
onErrorResume | Replaces the error stream with a fallback Publisher. | Provide default data when an error occurs. |
onErrorReturn | Emits a single fallback value when an error occurs. | Return a single fallback value for a known error. |
onErrorContinue | Allows the stream to continue processing despite errors. | Skip faulty items in a stream without stopping. |
doOnError | Executes side effects when an error occurs (but does not recover). | Log the error or perform cleanup. |
Using onErrorReturn for Simple Fallback – In case of returning a single fallback value use this method.
Flux.error(new RuntimeException("Error!"))
.onErrorReturn("Default Value")
.subscribe(System.out::println);
Output
Default Value
Logging Errors with doOnError – If you want to log an error without altering the stream
Flux.error(new RuntimeException("Critical Error"))
.doOnError(e -> System.err.println("Error occurred: " + e.getMessage()))
.onErrorReturn("Fallback Value")
.subscribe(System.out::println);
Output
Error occurred: Critical Error
Fallback Value
- collectList() – This method collects all elements into a List.
Flux.range(1, 5)
.collectList()
.subscribe(System.out::println);
PFB the code explanation
- 1st line creates a Flux range from 1 to 5. Emitted values are 1, 2, 3, 4, 5.
- collectList() method aggregates all the items emitted by the Flux into a List. Once the Flux completes, the resulting List is emitted as a single item. It converts the Flux to Mono<List<T>> where T is the type of elements emitted by the original Flux.
- In detailed description
- onNext(for Flux.range):
- The Flux emits the numbers 1, 2, 3, 4, 5.
- collectList():
- Aggregates all the emitted items (1, 2, 3, 4, 5) into a single List object [1, 2, 3, 4, 5].
- Mono Emission:
- After the aggregation, collectList() emits a single List object downstream.
- onComplete:
- The Mono created by collectList() sends a completion signal after emitting the List.
- Subscriber:
- The subscribe() call processes the collected List and prints it.
- onNext(for Flux.range):