Q-1). How to grasp the concepts of Mono, Flux & Backpressure?
A-1). Mono – It represents a single asynchronous value or no value
Flux – It represents a stream of 0 to N asynchronous values
Backpressure – A mechanism where a data producer generates data faster than the consumer can process.
Concept | Purpose | Emits | Use Case |
Mono | Asynchronous single value | 0 or 1 | Fetch a single record, send a single response |
Flux | Asynchronous stream of data | 0 to N | Batch Processing, Streaming data |
Backpressure | Control data flow | Consumer controls | Prevent overwhelming the consumer |
Q-2). What is the confusion regarding imperative and reactive programming styles?
A-2). The transition from imperative to reactive programming styles can be challenging for developers accustomed to traditional, blocking paradigms.
Concepts | Imperative Style | Reactive Style |
Flow Control | Operations occur sequentially | Declarative operations like map, filter, flatMap to process data streams |
Data Processing | Control the flow explicitly with loops, conditions, and return statements. | The framework handles the flow and timing of data emissions |
Blocking v/s Non Blocking Behavior | Operations blocks the current thread until it completes, execution is sequential | Operations are non-blocking, instead of waiting, you define a pipeline for when the data is available. |
Error Handling | Errors are handled using try-catch blocks in a straightforward manner | Errors propagate through the pipeline and are handled using operators like onErrorResume or onErrorMap |
Thread Management | Single-threaded by default unless explicitly creating new threads or using executors | Schedulers control the threading model (e.g. – parallel(), boundedElastic()) |
State and Mutability | State changes are managed explicitly with mutable variables | Avoids shared mutable state, uses transformations and immutability instead |
Debugging & Stack Traces | Errors include a clear stack trace pointing to the exact line where the issue occurred. | Errors in a reactive flow might not show a traditional stack trace because operations are asynchronous. |
Combining multiple async sources | Combine results using blocking calls or threads | Use operators like zip, merge, flatMap to combine asynchronous sources. |
Thinking in Streams | Works with values and collections directly | Focuses on stream of data, even for single items. Everything is treated as a sequence. |
Q-3). What is mismanaging thread contexts and the Schedulers API leading to unexpected behavior?
A-3). Mismanaging thread contexts and the Schedulers API in reactive programming can lead to unexpected behaviors, such as performance bottlenecks, thread starvation, or incorrect application behavior.
- Thread Context Mismanagement – The problems with Thread Context Mismanagement are Thread-bound data and Inconsistent context during context-switching.
Examples of Thread Context issues:
In a reactive flow, if the context switches to another thread, and you haven’t properly propagated the context, the security context might be lost, and authentication/authorization could fail.
Mono.just("Data")
.map(value -> {
// This code might not have access to the security context because it's running on a different thread
String user = SecurityContextHolder.getContext().getAuthentication().getName();
return value + " for " + user;
})
.subscribe(System.out::println);
Solution – Use Context in reactive streams to explicitly manage and propagate context across threads.
Mono.just("Data")
.flatMap(value -> {
return Mono.deferContextual(ctx -> {
// Accessing context explicitly
String user = (String) ctx.get("user");
return Mono.just(value + " for " + user);
});
})
.contextWrite(Context.of("user", "john_doe")) // Propagate user context
.subscribe(System.out::println);
- Missing Schedulers and Thread Management – In reactive programming, you can control which threads are used for certain operations using Schedulers. Mismanaging this control can lead to issues like blocking operations running on non-blocking threads, thread starvation, or unnecessary thread switching.
The problems are as follows:- Blocking Calls on Non-blocking threads – You have used either Mono or Flux with a blocking call in a reactive pipeline.
Mono.fromCallable(() -> {
// Simulate a blocking operation
Thread.sleep(1000);
return "Result";
})
.subscribeOn(Schedulers.boundedElastic()) // Proper usage of a non-reactive thread pool
.subscribe(System.out::println);
In this case, we’re making a blocking call (Thread.sleep) inside the reactive flow. Schedulers.boundedElastic() allows the blocking operation to run on a thread from a pool designed for blocking tasks.
If we mistakenly use Schedulers.parallel() or no Schedulers at all, the blocking call will tie up a non-blocking thread, which could lead to thread starvation.
- Excessive Thread Switching – Switching between too many threads could increase latency unnecessarily.
Mono.just("Hello")
.subscribeOn(Schedulers.parallel()) // First thread
.map(value -> value + " World")
.publishOn(Schedulers.single()) // Switches to a different thread
.map(value -> value + "!")
.subscribeOn(Schedulers.elastic()) // Switches again
.subscribe(System.out::println);
In this example, the data goes through three different threads, which may not be needed and could cause performance degradation.
Best practices to avoid Thread Context Mismanagement and Misusing Schedulers
- Choose the Right scheduler
- For I/O bound tasks, use Schedulers.boundedElastic().
- For CPU-bound tasks, use Schedulers.parallel().
- For single-threaded operations, use Schedulers.single().
- Context Propagation
- Use Context to explicitly manage shared state or context, such as security context.
- Remember that reactive flow may execute on multiple threads, so context management must be handled explicitly.
- Avoid Blocking Operations
- Avoid blocking calls, such as Thread.sleep(), blocking I/O in reactive flows. Always perform blocking tasks on appropriate schedulers, e.g. Schedulers.boundedElastic()
- Limit Thread Switching
- Minimize unnecessary switching between threads in your reactive pipeline to avoid performance overhead.
- Use publishOn() & subscribeOn() correctly
- publishOn() changes the thread for downstream operations (after the operator).
- subscribeOn() changes the thread for a subscription (upstream from the operator).
- Only use them when necessary to control where operations occur.
Q-4). What are the impeccable actions of misusing onErrorResume(), onErrorContinue(), or onErrorMap()?
A-4). Misusing the error-handling operators such as onErrorResume(), onErrorContinue(), and onErrorMap() in reactive programming can lead to unintended flow control, which might result in silent errors, inconsistent program behavior, or improper error propagation.
- onErrorResume() – This operator allows providing a fallback value or alternative flow when an error occurs in the reactive stream. The error is caught, and the operator provides an alternative publisher that will continue the sequence, thus preventing the flow from terminating.
Example of Misuse
Mono.just("ValidData")
.map(data -> {
if (data.equals("ValidData")) {
throw new RuntimeException("Something went wrong");
}
return data;
})
.onErrorResume(e -> Mono.just("Fallback")) // Silently consume the error
.subscribe(System.out::println); // Prints: Fallback (but error was critical)
Solution
Mono.just("ValidData")
.map(data -> {
if (data.equals("ValidData")) {
throw new RuntimeException("Something went wrong");
}
return data;
})
.onErrorResume(e -> {
// Log the error
System.err.println("Error: " + e.getMessage());
return Mono.just("Fallback");
})
.subscribe(System.out::println); // Still prints: Fallback, but logs the error
- onErrorContinue() – This operator allows the stream to continue processing subsequent elements after an error occurs. This operator swallows the error for the current item but continues with the next items in the sequence.
Example of Misuse
Flux.just("Item1", "Item2", "Item3")
.map(item -> {
if (item.equals("Item2")) {
throw new RuntimeException("Error processing Item2");
}
return item.toUpperCase();
})
.onErrorContinue((e, item) -> {
// Silently skip errors, continue with next items
System.out.println("Skipping error for " + item);
})
.subscribe(System.out::println);
Solution
Flux.just("Item1", "Item2", "Item3")
.map(item -> {
if (item.equals("Item2")) {
throw new RuntimeException("Error processing Item2");
}
return item.toUpperCase();
})
.onErrorContinue((e, item) -> {
// Log the error properly
System.err.println("Error with " + item + ": " + e.getMessage());
})
.subscribe(System.out::println);
- onErrorMap() – This operator allows you to transform an error into a different error type. It is useful for standardizing error handling or re-throwing errors with additional context.
Example of Misuse
Mono.just("ValidData")
.map(data -> {
if (data.equals("ValidData")) {
throw new IllegalArgumentException("Invalid argument");
}
return data;
})
.onErrorMap(e -> new RuntimeException("Transformed error: " + e.getMessage()))
.subscribe(System.out::println, error -> System.out.println(error.getMessage()));
Solution
Mono.just("ValidData")
.map(data -> {
if (data.equals("ValidData")) {
throw new IllegalArgumentException("Invalid argument");
}
return data;
})
.onErrorMap(IllegalArgumentException.class, e -> new CustomException("Custom error: " + e.getMessage()))
.subscribe(System.out::println, error -> System.out.println(error.getMessage()));
Q-5). What problems will one face if the resources used in reactive programming fail to release properly?
A-5). Different types of resources used in reactive programming such as database connections, file streams, or network connections if not released properly lead to resource leaks, performance degradation, and application instability.
Proper Resource Management Strategies
- Using operator – It is helpful for scenarios where you want to acquire resources like database connections or file handles and ensure that they are released once the reactive flow finishes.
Mono.using(
// Resource acquisition
() -> new FileInputStream("example.txt"),
// Resource usage
inputStream -> Mono.fromCallable(() -> readFile(inputStream)),
// Resource cleanup
FileInputStream::close
)
.subscribe(
result -> System.out.println("File read successfully: " + result),
error -> System.err.println("Error: " + error)
);
The Mono.using() operator ensures that the FileInputStream is properly closed once the flow completes (either successfully or with an error).
- Managing Database Connections – R2DBC has a built-in mechanism to automatically manage connections with connection pools.
Mono.just("SELECT * FROM users")
.flatMap(query -> {
return r2dbcEntityTemplate.getDatabaseClient()
.sql(query)
.map((row, metadata) -> row.get("name", String.class))
.all()
.doFinally(signalType -> {
// Explicitly release the connection or resource after completion
System.out.println("Releasing connection or cleaning up resources.");
});
})
.subscribe();
doFinally() is used to ensure that resources (like database connections) are cleaned up when the flow completes, whether it completes successfully or fails with an error.
- doTerminate() or doFinally() for Cleanup – doFinally() executes a block of code after the stream terminates, whether successfully or with an error. doTerminate() executes a block of code before the stream terminates, useful for logging or performing last-minute operations.
Mono.fromCallable(() -> {
// Perform some work like database or file operation
return "Result";
})
.doFinally(signalType -> {
// Perform cleanup, like closing a database connection
System.out.println("Resource cleanup");
})
.subscribe(result -> {
System.out.println("Work complete: " + result);
});
doFinally() ensures the cleanup happens regardless of how the flow terminates (normal completion, error, or cancellation).
- Handling Subscription Cancellation – In some cases, the reactive flow might be canceled before completion. You must ensure that resources are cleaned up even if the subscription is canceled. doOnCancel() operator allows you to clean up resources when the subscription is canceled.
Mono.just("Start operation")
.doOnCancel(() -> {
// Perform resource cleanup on cancellation
System.out.println("Resource cleanup on cancellation");
})
.subscribe();
doOnTerminate() and doOnCancel() are particularly useful in cases where you want to ensure resources are cleaned up even if the subscriber cancels the subscription.
- Using Connection Pools – For database connections, always prefer connection pools (like HikariCP or R2DBC connection pools), which handle the opening, reuse, and release of database connections automatically.
If you’re using a reactive connection pool, it is important to:
- Configure the pool size to match the expected load.
- Set proper timeout and retry policies.
- Ensure connections are released back to the pool after use.
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:h2://localhost/test");
Mono.from(connectionFactory.create())
.flatMap(connection -> {
return Mono.from(connection.createStatement("SELECT * FROM users")
.execute())
.doFinally(signalType -> connection.close()); // Ensure connection is closed
})
.subscribe();
This ensures that connections are returned to the pool once they’re no longer in use.
Q-6). What are the impeccable actions of mishandling shared mutable state due to misunderstanding of reactive paradigms?
A-6). Mishandling shared mutable state in reactive programming is a common issue that can lead to race conditions, inconsistent state, and unpredictable behavior.
Shared Mutable State is a Problem in Reactive Programming
- Concurrency – In reactive systems, operations can be executed concurrently (e.g., multiple threads or tasks), and mutable state shared between them can lead to race conditions where different threads update or read the state at the same time, leading to inconsistent or unpredictable results.
- Thread-Safety – Reactive programming encourages the use of asynchronous, non-blocking operations. If mutable state is accessed from multiple threads without proper synchronization, it can cause issues like data corruption, crashes, or unexpected results.
- Backpressure and Flow Control – Reactive systems use backpressure mechanisms to manage the flow of data. Mishandling state can lead to dropped or delayed events, incorrect processing, or failure to respond to changes in the data stream.
- Reactive Streams’ Non-Blocking Nature – Reactive programming is based on non-blocking flows, meaning that state management must not block or slow down the event loop. Mishandling shared state can create bottlenecks, blocking threads, or inefficient resource usage.
Common Mistakes with Shared Mutable State in Reactive Programming
- Unprotected Access to Shared State – Directly modifying shared state (like a Map or a List) from different threads without any synchronization mechanism.
AtomicInteger counter = new AtomicInteger(0);
Flux.just(1, 2, 3)
.map(i -> {
counter.addAndGet(i); // Directly modifying shared mutable state
return counter.get();
})
.subscribe(System.out::println);
In this example, multiple subscribers could try to update the counter simultaneously, which can cause inconsistent results or incorrect counting.
- State not being properly Isolated – Reactive flows need to be isolated to prevent shared mutable state from leaking between different flows or threads. Using global state or unprotected shared resources (like static variables or thread-local storage) can lead to unintended interactions.
public static int sharedState = 0;
Mono.just("data")
.map(data -> {
sharedState++; // Modifying shared state
return sharedState;
})
.subscribe(System.out::println);
The static variable sharedState can be accessed and modified concurrently by multiple consumers, leading to unpredictable values.
- Non-Thread-Safe Collections – When using collections like ArrayList, hashMap, or LinkedList that are not thread-safe, concurrent updates can result in data corruption.
List<String> sharedList = new ArrayList<>();
Flux.just("one", "two", "three")
.doOnNext(sharedList::add) // Modifying shared state without synchronization
.subscribe();
Here, multiple subscribers can attempt to add elements to sharedList concurrently, leading to unpredictable behavior or ConcurrentModificationException.
- Improper Use of subscribe and Side Effects – Since susbcribe is a terminal operation in a reactive flow, side effects like modifying shared state can cause problems. If shared mutable state is being modified by each subscription, you may inadvertently have multiple writes to the state that are not coordinated, leading to issues.
Strategies for Properly Managing Shared Mutable State in Reactive Programming
- Use Immutable Data Structures – Immutable objects are inherently thread-safe. By using immutable data structures (e.g., List.of(), Map.of(), or Collections.unmodifiableList()), you prevent multiple threads from modifying the same object concurrently.
List<String> originalList = List.of("apple", "banana");
Flux.just("cherry")
.map(item -> {
List<String> newList = new ArrayList<>(originalList); // Create a new instance
newList.add(item);
return newList;
})
.subscribe(System.out::println);
- Use Thread-Safe Data Structures – If you must use mutable state, ensure the data structures are thread-safe. Java’s ConcurrentMap, CopyOnWriteArrayList and other classes in java.util.concurrent are designed to handle concurrent updates without issues.
Map<String, Integer> threadSafeMap = new ConcurrentHashMap<>();
Flux.just("apple", "banana", "cherry")
.doOnNext(item -> threadSafeMap.put(item, threadSafeMap.getOrDefault(item, 0) + 1))
.subscribe();
- Isolate State to Individual Subscribers or Flows – Each subscriber or flow should work with its own isolated copy of data to avoid interference between concurrent operations. Avoid global state or mutable shared resources that multiple consumers can access.
Mono.just("data")
.map(data -> {
// Isolated, per-subscriber state
int isolatedState = 0;
return isolatedState + 1;
})
.subscribe(System.out::println);
- Use Atomic Operations for Shared State – If you absolutely need to manage mutable shared state, use atomic operations (e.g., AtomicInteger, AtomicReference, etc.) to perform safe, thread-safe updates.
AtomicInteger sharedCounter = new AtomicInteger(0);
Flux.just(1, 2, 3)
.doOnNext(i -> sharedCounter.addAndGet(i)) // Atomic update to shared state
.subscribe();
AtomicInteger ensures that the increment operation is thread-safe and that no race condition will occur.
- Avoid Shared Mutable State in Non-Blocking Contexts – The core benefit of reactive programming is that operations are non-blocking and asynchronous. If shared mutable state is involved, avoid blocking operations that can lead to thread contention and slow down the flow.
Mono.just("data")
.map(data -> {
// Avoid blocking or waiting for I/O within reactive flows
// State management should not be a bottleneck
return data.toUpperCase();
})
.subscribe(System.out::println);
- Leverage Contexts or State Management Mechanisms – If you need to manage user-specific or session-specific state, consider using Context in Spring WebFlux, which allows you to pass state along with the reactive stream, keeping it isolated to the current thread or subscriber.
Mono.deferContextual(ctx -> {
String user = ctx.get("user");
return Mono.just("User: " + user);
})
.contextWrite(Context.of("user", "JohnDoe"))
.subscribe(System.out::println);
The Context in Spring WebFlux provides a way to maintain per-request state without relying on shared mutable state, and it’s naturally isolated across reactive operators.