Q-1). How to grasp the concepts of Mono, Flux & Backpressure?

A-1). Mono – It represents a single asynchronous value or no value

Flux – It represents a stream of 0 to N asynchronous values

Backpressure – A mechanism where a data producer generates data faster than the consumer can process.

ConceptPurposeEmitsUse Case
MonoAsynchronous single value0 or 1Fetch a single record, send a single response
FluxAsynchronous stream of data0 to NBatch Processing, Streaming data
BackpressureControl data flowConsumer controlsPrevent overwhelming the consumer

Q-2). What is the confusion regarding imperative and reactive programming styles?

A-2). The transition from imperative to reactive programming styles can be challenging for developers accustomed to traditional, blocking paradigms.

ConceptsImperative StyleReactive Style
Flow ControlOperations occur sequentiallyDeclarative operations like map, filter, flatMap to process data streams
Data ProcessingControl the flow explicitly with loops, conditions, and return statements.The framework handles the flow and timing of data emissions
Blocking v/s Non Blocking BehaviorOperations blocks the current thread until it completes, execution is sequentialOperations are non-blocking, instead of waiting, you define a pipeline for when the data is available.
Error HandlingErrors are handled using try-catch blocks in a straightforward mannerErrors propagate through the pipeline and are handled using operators like onErrorResume or onErrorMap
Thread ManagementSingle-threaded by default unless explicitly creating new threads or using executorsSchedulers control the threading model (e.g. – parallel(), boundedElastic())
State and MutabilityState changes are managed explicitly with mutable variablesAvoids shared mutable state, uses transformations and immutability instead
Debugging & Stack TracesErrors include a clear stack trace pointing to the exact line where the issue occurred.Errors in a reactive flow might not show a traditional stack trace because operations are asynchronous.
Combining multiple async sourcesCombine results using blocking calls or threadsUse operators like zip, merge, flatMap to combine asynchronous sources.
Thinking in StreamsWorks with values and collections directlyFocuses on stream of data, even for single items. Everything is treated as a sequence.

Q-3). What is mismanaging thread contexts and the Schedulers API leading to unexpected behavior?

A-3). Mismanaging thread contexts and the Schedulers API in reactive programming can lead to unexpected behaviors, such as performance bottlenecks, thread starvation, or incorrect application behavior.

  • Thread Context Mismanagement – The problems with Thread Context Mismanagement are Thread-bound data and Inconsistent context during context-switching.

Examples of Thread Context issues:

In a reactive flow, if the context switches to another thread, and you haven’t properly propagated the context, the security context might be lost, and authentication/authorization could fail.

Mono.just("Data")
    .map(value -> {
        // This code might not have access to the security context because it's running on a different thread
        String user = SecurityContextHolder.getContext().getAuthentication().getName();
        return value + " for " + user;
    })
    .subscribe(System.out::println);

Solution – Use Context in reactive streams to explicitly manage and propagate context across threads.

Mono.just("Data")
    .flatMap(value -> {
        return Mono.deferContextual(ctx -> {
            // Accessing context explicitly
            String user = (String) ctx.get("user");
            return Mono.just(value + " for " + user);
        });
    })
    .contextWrite(Context.of("user", "john_doe"))  // Propagate user context
    .subscribe(System.out::println);
  • Missing Schedulers and Thread Management – In reactive programming, you can control which threads are used for certain operations using Schedulers. Mismanaging this control can lead to issues like blocking operations running on non-blocking threads, thread starvation, or unnecessary thread switching.
    The problems are as follows:
    • Blocking Calls on Non-blocking threads – You have used either Mono or Flux with a blocking call in a reactive pipeline.
Mono.fromCallable(() -> {
    // Simulate a blocking operation
    Thread.sleep(1000);
    return "Result";
})
.subscribeOn(Schedulers.boundedElastic()) // Proper usage of a non-reactive thread pool
.subscribe(System.out::println);

In this case, we’re making a blocking call (Thread.sleep) inside the reactive flow. Schedulers.boundedElastic() allows the blocking operation to run on a thread from a pool designed for blocking tasks.

If we mistakenly use Schedulers.parallel() or no Schedulers at all, the blocking call will tie up a non-blocking thread, which could lead to thread starvation.

  • Excessive Thread Switching – Switching between too many threads could increase latency unnecessarily.
Mono.just("Hello")
    .subscribeOn(Schedulers.parallel())   // First thread
    .map(value -> value + " World")
    .publishOn(Schedulers.single())       // Switches to a different thread
    .map(value -> value + "!")
    .subscribeOn(Schedulers.elastic())    // Switches again
    .subscribe(System.out::println);

In this example, the data goes through three different threads, which may not be needed and could cause performance degradation.

Best practices to avoid Thread Context Mismanagement and Misusing Schedulers

  • Choose the Right scheduler
    • For I/O bound tasks, use Schedulers.boundedElastic().
    • For CPU-bound tasks, use Schedulers.parallel().
    • For single-threaded operations, use Schedulers.single().
  • Context Propagation
    • Use Context to explicitly manage shared state or context, such as security context.
    • Remember that reactive flow may execute on multiple threads, so context management must be handled explicitly.
  • Avoid Blocking Operations
    • Avoid blocking calls, such as Thread.sleep(), blocking I/O in reactive flows. Always perform blocking tasks on appropriate schedulers, e.g. Schedulers.boundedElastic()
  • Limit Thread Switching
    • Minimize unnecessary switching between threads in your reactive pipeline to avoid performance overhead.
  • Use publishOn() & subscribeOn() correctly
    • publishOn() changes the thread for downstream operations (after the operator).
    • subscribeOn() changes the thread for a subscription (upstream from the operator).
    • Only use them when necessary to control where operations occur.

Q-4). What are the impeccable actions of misusing onErrorResume(), onErrorContinue(), or onErrorMap()?

A-4). Misusing the error-handling operators such as onErrorResume(), onErrorContinue(), and onErrorMap() in reactive programming can lead to unintended flow control, which might result in silent errors, inconsistent program behavior, or improper error propagation.

  • onErrorResume() – This operator allows providing a fallback value or alternative flow when an error occurs in the reactive stream. The error is caught, and the operator provides an alternative publisher that will continue the sequence, thus preventing the flow from terminating.

Example of Misuse

Mono.just("ValidData")
    .map(data -> {
        if (data.equals("ValidData")) {
            throw new RuntimeException("Something went wrong");
        }
        return data;
    })
    .onErrorResume(e -> Mono.just("Fallback"))  // Silently consume the error
    .subscribe(System.out::println);  // Prints: Fallback (but error was critical)

Solution

Mono.just("ValidData")
    .map(data -> {
        if (data.equals("ValidData")) {
            throw new RuntimeException("Something went wrong");
        }
        return data;
    })
    .onErrorResume(e -> {
        // Log the error
        System.err.println("Error: " + e.getMessage());
        return Mono.just("Fallback");
    })
    .subscribe(System.out::println);  // Still prints: Fallback, but logs the error
  • onErrorContinue() – This operator allows the stream to continue processing subsequent elements after an error occurs. This operator swallows the error for the current item but continues with the next items in the sequence.

Example of Misuse

Flux.just("Item1", "Item2", "Item3")
    .map(item -> {
        if (item.equals("Item2")) {
            throw new RuntimeException("Error processing Item2");
        }
        return item.toUpperCase();
    })
    .onErrorContinue((e, item) -> {
        // Silently skip errors, continue with next items
        System.out.println("Skipping error for " + item);
    })
    .subscribe(System.out::println);

Solution

Flux.just("Item1", "Item2", "Item3")
    .map(item -> {
        if (item.equals("Item2")) {
            throw new RuntimeException("Error processing Item2");
        }
        return item.toUpperCase();
    })
    .onErrorContinue((e, item) -> {
        // Log the error properly
        System.err.println("Error with " + item + ": " + e.getMessage());
    })
    .subscribe(System.out::println);
  • onErrorMap() – This operator allows you to transform an error into a different error type. It is useful for standardizing error handling or re-throwing errors with additional context.

Example of Misuse

Mono.just("ValidData")
    .map(data -> {
        if (data.equals("ValidData")) {
            throw new IllegalArgumentException("Invalid argument");
        }
        return data;
    })
    .onErrorMap(e -> new RuntimeException("Transformed error: " + e.getMessage()))
    .subscribe(System.out::println, error -> System.out.println(error.getMessage()));

Solution

Mono.just("ValidData")
    .map(data -> {
        if (data.equals("ValidData")) {
            throw new IllegalArgumentException("Invalid argument");
        }
        return data;
    })
    .onErrorMap(IllegalArgumentException.class, e -> new CustomException("Custom error: " + e.getMessage()))
    .subscribe(System.out::println, error -> System.out.println(error.getMessage()));

Q-5). What problems will one face if the resources used in reactive programming fail to release properly?

A-5). Different types of resources used in reactive programming such as database connections, file streams, or network connections if not released properly lead to resource leaks, performance degradation, and application instability.

Proper Resource Management Strategies

  • Using operator – It is helpful for scenarios where you want to acquire resources like database connections or file handles and ensure that they are released once the reactive flow finishes.
Mono.using(
        // Resource acquisition
        () -> new FileInputStream("example.txt"),
        
        // Resource usage
        inputStream -> Mono.fromCallable(() -> readFile(inputStream)),
        
        // Resource cleanup
        FileInputStream::close
)
.subscribe(
        result -> System.out.println("File read successfully: " + result),
        error -> System.err.println("Error: " + error)
);

The Mono.using() operator ensures that the FileInputStream is properly closed once the flow completes (either successfully or with an error).

  • Managing Database Connections – R2DBC has a built-in mechanism to automatically manage connections with connection pools.
Mono.just("SELECT * FROM users")
    .flatMap(query -> {
        return r2dbcEntityTemplate.getDatabaseClient()
            .sql(query)
            .map((row, metadata) -> row.get("name", String.class))
            .all()
            .doFinally(signalType -> {
                // Explicitly release the connection or resource after completion
                System.out.println("Releasing connection or cleaning up resources.");
            });
    })
    .subscribe();

doFinally() is used to ensure that resources (like database connections) are cleaned up when the flow completes, whether it completes successfully or fails with an error.

  • doTerminate() or doFinally() for Cleanup – doFinally() executes a block of code after the stream terminates, whether successfully or with an error. doTerminate() executes a block of code before the stream terminates, useful for logging or performing last-minute operations.
Mono.fromCallable(() -> {
        // Perform some work like database or file operation
        return "Result";
    })
    .doFinally(signalType -> {
        // Perform cleanup, like closing a database connection
        System.out.println("Resource cleanup");
    })
    .subscribe(result -> {
        System.out.println("Work complete: " + result);
    });

doFinally() ensures the cleanup happens regardless of how the flow terminates (normal completion, error, or cancellation).

  • Handling Subscription Cancellation – In some cases, the reactive flow might be canceled before completion. You must ensure that resources are cleaned up even if the subscription is canceled. doOnCancel() operator allows you to clean up resources when the subscription is canceled.
Mono.just("Start operation")
    .doOnCancel(() -> {
        // Perform resource cleanup on cancellation
        System.out.println("Resource cleanup on cancellation");
    })
    .subscribe();

doOnTerminate() and doOnCancel() are particularly useful in cases where you want to ensure resources are cleaned up even if the subscriber cancels the subscription.

  • Using Connection Pools – Always prefer connection pools (like HikariCP or R2DBC) for Database Connection, which handle the opening, reuse, and release of database connections automatically.

If you’re using a reactive connection pool, it is important to:

  • Configure the pool size to match the expected load.
  • Set proper timeout and retry policies.
  • Ensure connections are released back to the pool after use.
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:h2://localhost/test");

Mono.from(connectionFactory.create())
    .flatMap(connection -> {
        return Mono.from(connection.createStatement("SELECT * FROM users")
                .execute())
                .doFinally(signalType -> connection.close()); // Ensure connection is closed
    })
    .subscribe();

This ensures that connections are returned to the pool once they’re no longer in use.

Q-6). What are the impeccable actions of mishandling shared mutable state due to misunderstanding reactive paradigms?

A-6). Mishandling shared mutable state in reactive programming is a common issue that can lead to race conditions, inconsistent state, and unpredictable behavior.

Shared Mutable State is a Problem in Reactive Programming

  1. Concurrency – In reactive systems, operations can be executed concurrently (e.g., multiple threads or tasks), and mutable state shared between them can lead to race conditions where different threads update or read the state at the same time, leading to inconsistent or unpredictable results.
  2. Thread-Safety – Reactive programming encourages the use of asynchronous, non-blocking operations. If mutable state is accessed from multiple threads without proper synchronization, it can cause issues like data corruption, crashes, or unexpected results.
  3. Backpressure and Flow Control – Reactive systems use backpressure mechanisms to manage the flow of data. Mishandling state can lead to dropped or delayed events, incorrect processing, or failure to respond to changes in the data stream.
  4. Reactive Streams’ Non-Blocking Nature – Reactive programming is based on non-blocking flows, meaning that state management must not block or slow down the event loop. Mishandling shared state can create bottlenecks, blocking threads, or inefficient resource usage.

Common Mistakes with Shared Mutable State in Reactive Programming

  • Unprotected Access to Shared State – Directly modifying shared state (like a Map or a List) from different threads without any synchronization mechanism.
AtomicInteger counter = new AtomicInteger(0);
Flux.just(1, 2, 3)
    .map(i -> {
        counter.addAndGet(i); // Directly modifying shared mutable state
        return counter.get();
    })
    .subscribe(System.out::println);

In this example, multiple subscribers could try to update the counter simultaneously, which can cause inconsistent results or incorrect counting.

  • State not being properly Isolated – Reactive flows need to be isolated to prevent shared mutable state from leaking between different flows or threads. Using global state or unprotected shared resources (like static variables or thread-local storage) can lead to unintended interactions.
public static int sharedState = 0;

Mono.just("data")
    .map(data -> {
        sharedState++; // Modifying shared state
        return sharedState;
    })
    .subscribe(System.out::println);

The static variable sharedState can be accessed and modified concurrently by multiple consumers, leading to unpredictable values.

  • Non-Thread-Safe Collections – When using collections like ArrayList, hashMap, or LinkedList that are not thread-safe, concurrent updates can result in data corruption.
List<String> sharedList = new ArrayList<>();

Flux.just("one", "two", "three")
    .doOnNext(sharedList::add)  // Modifying shared state without synchronization
    .subscribe();

Here, multiple subscribers can attempt to add elements to sharedList concurrently, leading to unpredictable behavior or ConcurrentModificationException.

  • Improper Use of subscribe and Side Effects – Since susbcribe is a terminal operation in a reactive flow, side effects like modifying shared state can cause problems. If shared mutable state is being modified by each subscription, you may inadvertently have multiple writes to the state that are not coordinated, leading to issues.

Strategies for Properly Managing Shared Mutable State in Reactive Programming

  • Use Immutable Data Structures – Immutable objects are inherently thread-safe. By using immutable data structures (e.g., List.of(), Map.of(), or Collections.unmodifiableList()), you prevent multiple threads from modifying the same object concurrently.
List<String> originalList = List.of("apple", "banana");
Flux.just("cherry")
    .map(item -> {
        List<String> newList = new ArrayList<>(originalList);  // Create a new instance
        newList.add(item);
        return newList;
    })
    .subscribe(System.out::println);
  • Use Thread-Safe Data Structures – If you must use mutable state, ensure the data structures are thread-safe. Java’s ConcurrentMap, CopyOnWriteArrayList and other classes in java.util.concurrent are designed to handle concurrent updates without issues.
Map<String, Integer> threadSafeMap = new ConcurrentHashMap<>();

Flux.just("apple", "banana", "cherry")
    .doOnNext(item -> threadSafeMap.put(item, threadSafeMap.getOrDefault(item, 0) + 1))
    .subscribe();
  • Isolate State to Individual Subscribers or Flows – Each subscriber or flow should work with its own isolated copy of data to avoid interference between concurrent operations. Avoid global state or mutable shared resources that multiple consumers can access.
Mono.just("data")
    .map(data -> {
        // Isolated, per-subscriber state
        int isolatedState = 0;
        return isolatedState + 1;
    })
    .subscribe(System.out::println);
  • Use Atomic Operations for Shared State – If you absolutely need to manage mutable shared state, use atomic operations (e.g., AtomicInteger, AtomicReference, etc.) to perform safe, thread-safe updates.
AtomicInteger sharedCounter = new AtomicInteger(0);

Flux.just(1, 2, 3)
    .doOnNext(i -> sharedCounter.addAndGet(i))  // Atomic update to shared state
    .subscribe();

AtomicInteger ensures that the increment operation is thread-safe and that no race condition will occur.

  • Avoid Shared Mutable State in Non-Blocking Contexts – The core benefit of reactive programming is that operations are non-blocking and asynchronous. If shared mutable state is involved, avoid blocking operations that can lead to thread contention and slow down the flow.
Mono.just("data")
    .map(data -> {
        // Avoid blocking or waiting for I/O within reactive flows
        // State management should not be a bottleneck
        return data.toUpperCase();
    })
    .subscribe(System.out::println);
  • Leverage Contexts or State Management Mechanisms – If you need to manage user-specific or session-specific state, consider using Context in Spring WebFlux, which allows you to pass state along with the reactive stream, keeping it isolated to the current thread or subscriber.
Mono.deferContextual(ctx -> {
    String user = ctx.get("user");
    return Mono.just("User: " + user);
})
.contextWrite(Context.of("user", "JohnDoe"))
.subscribe(System.out::println);

The Context in Spring WebFlux provides a way to maintain a per-request state without relying on a shared mutable state, and it’s naturally isolated across reactive operators.

Q-7). What problems are faced while writing unit test cases for reactive streams and asynchronous behaviors?

A-7). Writing unit test cases for reactive streams and asynchronous behaviors can be challenging due to the nature of non-blocking, concurrent operations.

Common Problems in Unit Testing Reactive Streams
  • Asynchronous Execution – Use StepVerifier from Project Reactor to test reactive streams by subscribing to them and asserting their behavior step-by-step.
Flux<String> flux = Flux.just("A", "B", "C");

StepVerifier.create(flux)
    .expectNext("A")
    .expectNext("B")
    .expectNext("C")
    .verifyComplete();
  • Non-Deterministic Order – If the order of events is important, assert the order using StepVerifier. If not, use matches or conditions that allow flexibility in the order of events.
  • Handling Errors – Use StepVerifier to test for specific error scenarios and ensure that errors are emitted or handled as expected.
Flux<String> fluxWithError = Flux.just("A", "B").concatWith(Flux.error(new RuntimeException("Error")));

StepVerifier.create(fluxWithError)
    .expectNext("A")
    .expectNext("B")
    .expectError(RuntimeException.class)
    .verify();
  • Backpressure Testing – Use StepVerifier to test the backpressure behavior by requesting a specific number of items and verifying how the stream handles it.
Flux<String> flux = Flux.range(1, 10).map(Object::toString);

StepVerifier.create(flux)
    .thenRequest(5)
    .expectNextCount(5)
    .thenRequest(5)
    .expectNextCount(5)
    .verifyComplete();
  • Thread Context Switching – Use TestScheduler from Reactor’s test module to control the passage of time and verify the order and timing of events.
TestScheduler scheduler = Schedulers.newTest("test-scheduler");
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1), scheduler).take(3);

StepVerifier.withVirtualTime(() -> flux, () -> scheduler)
    .expectSubscription()
    .then(() -> scheduler.advanceTimeBy(Duration.ofSeconds(1)))
    .expectNext(0L)
    .then(() -> scheduler.advanceTimeBy(Duration.ofSeconds(1)))
    .expectNext(1L)
    .then(() -> scheduler.advanceTimeBy(Duration.ofSeconds(1)))
    .expectNext(2L)
    .verifyComplete();
  • Verifying Cancellation – Use StepVerifier to simulate cancellation and verify that the flow reacts correctly.
Mono<String> mono = Mono.just("data").doOnCancel(() -> System.out.println("Cancelled"));

StepVerifier.create(mono)
    .thenCancel()
    .verify();
  • Complex Data Transformations – Break down complex transformations into smaller, testable units, and test each transformation separately before integrating them into the final flow.
  • Ensuring Resource Cleanup – Use StepVerifier‘s verifyThenAssertThat to check for resource cleanup or use doFinally to ensure resources are released.
Mono<String> mono = Mono.just("data").doFinally(signalType -> System.out.println("Cleanup"));

StepVerifier.create(mono)
    .expectNext("data")
    .verifyComplete();
  • Mocking Dependencies – Use tools like Mockito or Reactor’s test utilities to mock and return Mono or Flux in a controlled manner.
MyService service = Mockito.mock(MyService.class);
Mockito.when(service.getData()).thenReturn(Mono.just("mocked data"));

StepVerifier.create(service.getData())
    .expectNext("mocked data")
    .verifyComplete();
  • Handling Time-based operations – Use virtual time with StepVerifier to simulate the passage of time and verify time-based behaviors without real-time delays.
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofSeconds(10)))
    .thenAwait(Duration.ofSeconds(10))
    .expectNextCount(1)
    .verifyComplete();

Q-8). What are the problems faced while adapting blocking APIs or legacy codebases to fir the non-blocking model?

A-8). Adapting blocking APIs or legacy codebases to fit the non-blocking model.

Challenges in Adapting Blocking APIs to Non-Blocking Models
  • Synchronous nature of blocking APIs – Traditional database access using JDBC is blocking. The thread waits for the query to complete before proceeding.
  • Thread Contention and Scalibility – If multiple threads are waiting for blocking I/O, the system can quickly run out of available threads, leading to bottlenecks.
  • Complexity in Error Handling and Timeouts – Timeouts in blocking code usually involve setting them directly on the API call, whereas reactive streams use operators like timeout() to handle such cases asynchronously.
  • Difficulty in converting code logic – A sequence of database calls in a transaction needs to be rewritten to handle each step reactively, maintaining the sequence without blocking.
Strategies for Adapting Blocking APIs and Legacy Codebases
  • Wrapping blocking calls in reactive types – Use Schedulers and Mono.fromCallable() or Flux.create() to wrap blocking calls and offload them to a dedicated thread pool, keeping the main reactive thread free.
Mono.fromCallable(() -> blockingDatabaseCall())
    .subscribeOn(Schedulers.boundedElastic())  // Offload to a dedicated thread pool
    .subscribe(result -> System.out.println("Result: " + result));
  • Use Schedulers.boundedElastic() for Blocking WorkloadsSchedulers.boundedElastic() provides a thread pool optimized for blocking I/O operations. It creates threads as needed but limits the maximum number to avoid overwhelming the system.
Mono.fromCallable(() -> blockingApiCall())
    .subscribeOn(Schedulers.boundedElastic())
    .map(result -> "Processed: " + result)
    .subscribe(System.out::println);
  • Integrate Reactive Wrappers for Blocking Libraries – For common blocking libraries (e.g., JDBC, JPA), use reactive wrappers provided by the ecosystem, such as R2DBC (Reactive Relational Database Connectivity) for non-blocking database access.
repository.findAll()
    .subscribeOn(Schedulers.parallel())
    .subscribe(System.out::println);
  • Refactor Legacy Code to Non-Blocking Logic – Break down legacy, sequential logic into small, asynchronous tasks that can be composed using Mono and Flux operators like flatMap, concatMap, or zip.
Mono.just("Start")
    .flatMap(start -> asyncOperation1())
    .flatMap(result1 -> asyncOperation2(result1))
    .subscribe(System.out::println);
  • Leverage Asynchronous Libraries – Where possible, replace blocking libraries with their asynchronous or reactive counterparts (e.g., using WebClient instead of RestTemplate for HTTP calls).
WebClient client = WebClient.create("http://example.com");

client.get().uri("/data")
    .retrieve()
    .bodyToMono(String.class)
    .subscribe(response -> System.out.println("Response: " + response));
  • Handle Errors Reactively – Replace synchronous exception handling with reactive error operators like onErrorResume, onErrorReturn, or retry.
Mono.fromCallable(() -> possiblyFailingCall())
    .subscribeOn(Schedulers.boundedElastic())
    .onErrorResume(e -> Mono.just("Fallback Value"))
    .subscribe(System.out::println);
  • Implement Asynchronous Timeouts – Use reactive timeout operators to handle delays without blocking the thread, ensuring that operations complete within expected timeframes.
Mono.fromCallable(() -> longRunningCall())
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(Duration.ofSeconds(5), Mono.just("Timeout fallback"))
    .subscribe(System.out::println);
  • Adopt Reactive Transaction Management – In database operations, use reactive transaction management (e.g., in R2DBC) to ensure transactions are handled correctly in a non-blocking manner.
DatabaseClient client = DatabaseClient.create(connectionFactory);

client.inTransaction(transactionalClient -> 
    transactionalClient.execute("INSERT INTO table ...")
        .fetch().rowsUpdated()
).subscribe(System.out::println);

Q-9). How to handle the backpressure effectively, especially in high throughput systems?

A-9). Handling backpressure effectively is crucial in high-throughput systems using reactive programming to maintain system stability and prevent resource exhaustion. Backpressure occurs when a downstream component cannot keep up with the rate at which upstream components emit items, leading to potential issues like buffer overflow, memory exhaustion, or out-of-memory errors.

Challenges in Handling Backpressure
  • Data Overload – A reactive system consuming data from a high-speed data source like Kafka or a fast API might face a backlog if the processing speed is slower than the data ingestion rate.
  • Limited Resource Capacity – A slow database or API can become a bottleneck, causing the entire stream to back up.
  • Unbounded Buffers – If a consumer process cannot keep up, it might accumulate messages in memory, eventually causing an out-of-memory error.
Strategies for Handling Backpressure
  • Apply Backpressure Operators – Use Project Reactor‘s built-in backpressure operators like onBackpressureBuffer(), onBackpressureDrop(), or onBackpressureLatest() to manage how the system handles excess data.
Flux.range(1, 1000000)
    .onBackpressureBuffer(1000) // Buffer up to 1000 items
    .subscribe(System.out::println);
  • Use limitRate() to control Data Flow – Use limitRate() to control the number of items requested from the upstream, ensuring that the consumer processes data at a manageable rate.
Flux.range(1, 1000000)
    .limitRate(100) // Process 100 items at a time
    .subscribe(System.out::println);
  • Throttle Data Emission – Use throttleFirst(), throttleLast(), or throttleTime() to control the rate of data emission, especially when dealing with bursty data streams.
Flux.interval(Duration.ofMillis(10))
    .throttleFirst(Duration.ofSeconds(1)) // Emit one item every second
    .subscribe(System.out::println);
  • Buffer Data with Bounded Buffers – Use bounded buffers with onBackpressureBuffer() to temporarily store excess items but limit the buffer size to avoid memory exhaustion.
Flux.create(emitter -> {
    for (int i = 0; i < 1000000; i++) {
        emitter.next(i);
    }
    emitter.complete();
})
.onBackpressureBuffer(1000) // Buffer size of 1000 items
.subscribe(System.out::println);
  • Drop Excess Data – Use onBackpressureDrop() or onBackpressureLatest() to drop excess data when the downstream cannot keep up, ensuring the system doesn’t run out of memory.
Flux.range(1, 1000000)
    .onBackpressureDrop() // Drop excess items
    .subscribe(System.out::println);
  • Slow down the Producer – Use delayElements() to artificially slow down the producer, giving the consumer more time to process each item.
Flux.range(1, 1000)
    .delayElements(Duration.ofMillis(10)) // Delay each element by 10 ms
    .subscribe(System.out::println);
  • Batch Processing – Use buffer() or window() to collect items into batches and process them together, which can reduce the processing overhead and increase throughput.
Flux.range(1, 100)
    .buffer(10) // Process in batches of 10 items
    .subscribe(batch -> System.out.println("Batch: " + batch));
  • Use publishOn() with Bounded Queues – Use publsihOn() with a bounded queue size to manage how data is pushed through the stream, which helps control memory usage and prevents overwhelming downstream operators.
Flux.range(1, 1000000)
    .publishOn(Schedulers.boundedElastic(), 100) // Queue size of 100
    .subscribe(System.out::println);
  • Monitor and Tune System Metrics – Continuously monitor system metrics (like CPU usage, memory usage, and queue lengths) and adjust backpressure strategies as needed to maintain system stability. Use monitoring tools or custom metrics to track and adjust based on the system’s performance under load.
  • Reactive Pull Model – Ensure that the downstream explicitly requests data (using the reactive pull model) to control the flow of data and avoid overwhelming the system. Use request(n) in StepVerifier or similar testing utilities to simulate and control backpressure in tests.

Q-10). What are the differences between Cold Streams and hot Streams? And why this misunderstanding lead to repeated or missed data emissions?

A-10). Misunderstanding the differences between cold and hot streams in reactive programming can lead to issues like repeated data emissions or missed data, affecting the behavior and performance of your application.

The differences between Cold Streams and Host Streams are as follows:

ConceptCold StreamHot Stream
DefinitionCold streams are lazy and only begin emitting data when a subscriber subscribes. Each subscriber receives a fresh, independent sequence of data from the beginning.Hot streams start emitting data immediately, whether or not there are subscribers. Subscribers receive data from the point they subscribe onwards.
BehaviorEvery subscription triggers the stream from the start, so each subscriber sees the same data from the beginning.New subscribers may miss some data that was emitted before they subscribed, as the stream continues to emit data regardless of subscriptions.
ExampleFlux coldFlux = Flux.range(1, 10);
coldFlux.subscribe(data -> System.out.println("Subscriber 1: " + data));
coldFlux.subscribe(data -> System.out.println("Subscriber 2: " + data));
ConnectableFlux hotFlux = Flux.range(1, 10).publish();
hotFlux.connect(); // Starts emitting data immediately
hotFlux.subscribe(data -> System.out.println("Subscriber 1: " + data));
Thread.sleep(50); // Delay to simulate late subscription
hotFlux.subscribe(data -> System.out.println("Subscriber 2: " + data));
Common Misunderstandings and Their Effects
  • Expecting Cold Behavior from Hot Stream – If a real-time stock price feed is hot, late subscribers will not see the prices from before they subscribed, potentially missing important updates.
  • Expecting Hot Behavior from a Cold Stream – Reading a file with a cold stream will re-read the file for every new subscriber, wasting resources if multiple subscribers are supposed to share the same data.
  • Missed Data Due to Late Subscriptions in Hot Streams – A monitoring system subscribing late to a sensor stream might miss critical early readings.
  • Repeated Processing in Cold Streams – Fetching data from a database every time a subscriber connects, rather than caching results for reuse.
Strategies to Handle Cold and Hot Streams Properly
  • Identify the Stream Type – Determine whether your source of data is naturally cold (e.g., a database query) or hot (e.g., a live sensor). Use cold streams for finite, repeatable data sets, and hot streams for live, continuous data.
  • Use ConnectableFlux for Hot Streams – Convert cold streams to hot using publish() and connect(), making the stream hot and ensuring it starts emitting data immediately.
ConnectableFlux<Integer> hotStream = Flux.range(1, 10).publish();
hotStream.connect();
  • Cache Data for Late Subscribers – Use operators like replay() or cache() to cache emitted data for hot streams so late subscribers can see recent emissions.
Flux<Integer> cachedStream = Flux.range(1, 10).cache();
cachedStream.subscribe(System.out::println);
  • Buffer Emissions for Late Subscribers – Use buffer() or window() to accumulate data in a hot stream for late subscribers, ensuring they can process data in chunks.
Flux.interval(Duration.ofMillis(100))
    .buffer(Duration.ofSeconds(1))
    .subscribe(System.out::println);
  • Ensure Correct Subscription Timing – For hot streams, ensure all necessary subscribers are connected before the stream starts emitting data to avoid missing emissions.
ConnectableFlux<Integer> hotFlux = Flux.range(1, 10).publish();
hotFlux.subscribe(data -> System.out.println("Subscriber 1: " + data));
hotFlux.subscribe(data -> System.out.println("Subscriber 2: " + data));
hotFlux.connect();
  • Use switchOnNext() for Dynamic Streams – Use switchOnNext() to handle multiple hot streams dynamically, switching to the latest emitting stream.
Flux.switchOnNext(Flux.just(Flux.interval(Duration.ofMillis(100)), Flux.interval(Duration.ofMillis(200))))
    .subscribe(System.out::println);

Q-11). What are the reasons for memory leaks?

A-11). Subscribing without disposing or failing to cancel subscriptions in reactive programming can lead to memory leaks or resource exhaustion, as resources like memory, threads, or connections remain in use even after they are no longer needed.

Common Scenarios of Memory Leaks
  • Infinite Streams Without Disposal – Subscribing to an infinite stream (like Flux.interval()) without disposing of it keeps the subscription active, using memory and CPU resources indefinitely.
Flux.interval(Duration.ofSeconds(1))
    .subscribe(System.out::println); // No disposal, runs forever
  • Not Cancelling Manual Subscriptions – When using manual subscribe() methods without a mechanism to cancel them, the subscription stays active.
Disposable disposable = Flux.range(1, 100)
    .subscribe(System.out::println);
// If not disposed later, it can lead to memory leaks
  • Using Schedulers Without Cleanup – Using custom or bounded schedulers without proper disposal can result in threads or pools not being released.
Scheduler scheduler = Schedulers.newBoundedElastic(10, Integer.MAX_VALUE, "my-scheduler");
Flux.range(1, 100)
    .subscribeOn(scheduler)
    .subscribe(System.out::println);
// The scheduler should be disposed of when no longer needed
  • Event Listeners in UI Applications – In UI applications, reactive subscriptions tied to event listeners must be disposed of when the component is destroyed, or they continue to consume resources.
Disposable clickSubscription = Flux.create(emitter -> button.addActionListener(e -> emitter.next(e)))
    .subscribe(event -> System.out.println("Button clicked!"));
// Without disposing of clickSubscription, it can lead to memory leaks
Strategies to Avoid Memory Leaks
  • Use Disposable and dispose() – Capture the subscription in a Disposable and explicitly call dispose() when the stream is no longer needed.
Disposable disposable = Flux.interval(Duration.ofSeconds(1))
    .subscribe(System.out::println);
// Later, when the stream is no longer needed
disposable.dispose();
  • Use take() or takeUntil() to Limit the Stream – Use operators like take() or takeUntil() to limit the lifespan of the subscription.
Flux.interval(Duration.ofSeconds(1))
    .take(10) // Automatically complete after 10 emissions
    .subscribe(System.out::println);
  • Combine with Lifecycle Hooks – In frameworks like Spring or UI frameworks, tie the subscription to lifecycle hooks that dispose of it when the component or bean is destroyed.
@PreDestroy
public void cleanup() {
    disposable.dispose(); // Dispose of the subscription during bean destruction
}
  • UseSchedulers Properly – Ensure schedulers are properly shut down or disposed of when no longer needed, especially custom ones.
Scheduler scheduler = Schedulers.newBoundedElastic(10, Integer.MAX_VALUE, "my-scheduler");
// After usage
scheduler.dispose();
  • Use CompositeDisposable for Multiple Subscriptions – When managing multiple subscriptions, use a CompositeDisposable to group them and dispose of them all at once.
CompositeDisposable composite = new CompositeDisposable();
composite.add(Flux.range(1, 100).subscribe(System.out::println));
composite.add(Flux.interval(Duration.ofSeconds(1)).subscribe(System.out::println));
// Dispose of all when no longer needed
composite.dispose();
  • Auto-Cancel with autoDispose (in Libraries like RxJava) – Use autodispose or similar features from libraries to automatically handle disposal based on lifecycle events.
someObservable
    .observeOn(AndroidSchedulers.mainThread())
    .autoDispose(lifecycleOwner)
    .subscribe(System.out::println);
  • Reactive Contexts for Lifecycle Management – Use reactive contexts or operators like doFinally(), using(), or doOnTerminate() to ensure cleanup logic is executed when a stream completes or is disposed of.
Flux.using(
    () -> new BufferedReader(new FileReader("file.txt")),
    reader -> Flux.fromStream(reader.lines()),
    BufferedReader::close // Ensure the resource is closed
).subscribe(System.out::println);

Q-12). How to maintain consistent performance and reliability under extreme load conditions?

A-12). Maintaining consistent performance and reliability under extreme load conditions in reactive programming involves several challenges and strategies to ensure the system can handle high throughput without degrading performance or reliability.

Challenges
  • Backpressure Handling – The application may crash, slow down, or lose data if it cannot manage the pressure effectively.
  • Resource Contention – This can lead to slow response times, timeouts, or resource starvation for critical tasks.
  • Thread Management – The system can freeze or slow down dramatically as there are no free threads to handle new tasks.
  • Latency Spikes – Users experience unpredictable delays, leading to a poor user experience.
  • Circuit Breakers and Failures – This can cause widespread failures across the system, reducing overall availability and reliability.
Strategies for Maintaining Performance and Reliability
  • Effective Backpressure Management – Use reactive streams operators like onBackpressureBuffer(), onBackpressureDrop(), or onBackpressureLatest() to handle backpressure gracefully.
Flux.interval(Duration.ofMillis(10))
    .onBackpressureBuffer(1000) // Buffer up to 1000 items before applying backpressure
    .subscribe(System.out::println);
  • Scaling and Load Balancing
    • Use horizontal scaling by adding more instances to handle increased load.
    • Employ load balancers to distribute traffic evenly across instances.
    • Example – In a Kubernetes setup, use a load balancer like Nginx or HAProxy to manage traffic distribution.
  • Resource Pooling – Limit the number of concurrent connections or threads using resource pools (e.g., database connection pools, thread pools).
ConnectionPool connectionPool = new ConnectionPool(config);
Mono.usingWhen(connectionPool.acquire(), 
               connection -> Mono.from(connection.createStatement("SELECT * FROM table").execute()),
               Connection::release)
    .subscribe();
  • Thread Pool Management with Schedulers – Use appropriate schedulers (e.g., Schedulers.boundedElastic() for blocking operations, Schedulers.parallel() for CPU-bound tasks).
Flux.range(1, 10)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(System.out::println);
  • Latency Monitoring and Optimization
    • Continuously monitor system latencies using tools like Prometheus or Grafana.
    • Optimize performance by profiling and tuning critical paths (e.g., reducing unnecessary I/O, optimizing queries).
  • Circuit Breakers and Timeouts – Use libraries like Resilience4j or Hystrix to implement circuit breakers, retries, and fallbacks.
Mono<String> result = webClient.get()
    .uri("/endpoint")
    .retrieve()
    .bodyToMono(String.class)
    .timeout(Duration.ofSeconds(5))
    .onErrorResume(TimeoutException.class, ex -> Mono.just("Fallback response"));
  • Caching – Use caching (e.g., Redis, Memcached) to reduce load on backend systems by caching frequently accessed data.
Mono.just("key")
    .flatMap(key -> reactiveRedisTemplate.opsForValue().get(key))
    .switchIfEmpty(Mono.just("fallbackValue"));
  • Asynchronous I/O
    • Prefer non-blocking and asynchronous I/O operations to avoid blocking the limited thread pools in a reactive system.
    • Example – Use R2DBC for non-blocking database interactions instead of traditional JDBC.
  • Graceful Degradation – Implement graceful degradation strategies like serving cached or default responses when under extreme load.
Mono.just("request")
    .flatMap(request -> processRequest(request)
        .onErrorResume(ex -> Mono.just("default response")));
  • Testing Under Load
    • Regularly perform load testing using tools like JMeter, Gatling, or Locust to simulate extreme conditions and identify potential bottlenecks.
    • Example – Use Gatling scripts to simulate a high number of concurrent users accessing the system.

Q-13). How to create and manage intricate data pipelines involving multiple asynchronous and dependent operations?

A-13). Creating and managing intricate data pipelines involving multiple asynchronous and dependent operations in reactive programming can be complex due to the need for proper orchestration, error handling, and synchronization of asynchronous flows. These pipelines often involve chaining, merging, or splitting data streams while maintaining high throughput and responsiveness.

Challenges
  • Complex Chaining – This can lead to spaghetti code, making the pipeline difficult to debug and extend.
  • Error Propagation and Handling – Poor error handling can lead to unintended behaviors, lost data, or system crashes.
  • Coordination of Asynchronous Tasks – It can lead to race conditions, deadlocks, or inconsistent state.
  • Backpressure Propagation – Failure to handle backpressure properly can result in memory leaks or system overloads.
  • Data Transformation and Mapping – Inefficient transformations can degrade performance, and poorly structured code can lead to maintenance issues.
  • Concurrency Control – Mismanagement can lead to thread starvation or overhead from excessive context switching.
Strategies for Managing Intricate Data Pipelines
  • Using flatMap() and concatMap()flatMap() allows for asynchronous and parallel execution of dependent tasks. concatMap() ensures tasks are executed sequentially, maintaining the order of emissions.
Flux.just("task1", "task2", "task3")
    .flatMap(task -> processTask(task))
    .subscribe(result -> System.out.println("Processed: " + result));
  • Combining Streams with zip(), merge(), and combineLatest() – Use zip() to combine multiple streams by waiting for all streams to emit. Use merge() to interleave emissions from multiple streams. Use combineLatest() to combine the latest emissions from multiple streams.
Flux.zip(Flux.just(1, 2, 3), Flux.just("A", "B", "C"))
    .map(tuple -> tuple.getT1() + "-" + tuple.getT2())
    .subscribe(System.out::println); // Output: 1-A, 2-B, 3-C
  • Error Handling with onErrorResume() and onErrorReturn() – Use onErrorResume() to switch to an alternative stream on error. Use onErrorReturn() to return a fallback value when an error occurs.
Flux.just(1, 2, 3, 0)
    .map(i -> 10 / i)
    .onErrorResume(ex -> Flux.just(-1))
    .subscribe(System.out::println); // Output: 10, 5, 3, -1
  • Using GroupBy and Window for Batching – Use groupBy() to group elements by a key and process each group separately. Use window() to break a stream into smaller, manageable chunks (windows).
Flux.range(1, 10)
    .window(3)
    .flatMap(window -> window.collectList())
    .subscribe(System.out::println); // Output: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
  • Using Schedulers for Parallelism – Use Schedulers.parallel() for CPU-intensive operations. Use Schedulers.boundedElastic() for I/O-bound operations.
Flux.range(1, 10)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(i -> i * 2)
    .sequential()
    .subscribe(System.out::println);
  • Reactive Context and Thread Safety – Use reactive context to pass and share state safely across different parts of the pipeline without shared mutable state.
Mono.deferContextual(ctx -> Mono.just(ctx.get("key")))
    .contextWrite(Context.of("key", "value"))
    .subscribe(System.out::println); // Output: value
  • Testing Pipelines – Use StepVerifier for testing reactive streams by asserting expected behavior and outcomes.
StepVerifier.create(Flux.range(1, 3).map(i -> i * 2))
    .expectNext(2, 4, 6)
    .verifyComplete();

Q-14). How to manage distributed systems with multiple reactive nodes for better consistency and availability?

A-14). Managing distributed systems with multiple reactive nodes, especially in a reactive programming context, involves ensuring consistency and availability across different nodes, which can be challenging due to the inherent asynchrony and potential network partitions. These challenges are often addressed by strategies derived from distributed systems principles like the CAP theorem and eventual consistency models.

Challenges
  • Data Consistency – Inconsistencies can lead to data corruption, stale reads, or conflicting updates.
  • Network Partitions and Latency – This can result in data loss or inconsistent system states.
  • Coordination Overhead – It can reduce overall system performance and responsiveness.
  • Event OrderingOut-of-order processing can lead to incorrect system states or duplicate events.
  • Fault Tolerance – Without proper fault tolerance, the system may suffer from downtime or data inconsistencies.
  • Scalability – Poor scalability can result in bottlenecks and degraded performance under load.
Strategies for Managing Consistency and Availability
  • Eventual Consistency and CRDTs – Embrace eventual consistency where updates propagate asynchronously, and conflicts are resolved using Conflict-free Replicated Data Types (CRDTs).
// Example pseudo-code for CRDT-based counter
crdtCounter.increment();
crdtCounter.merge(otherNodeCrdtCounter);
  • Distributed Messaging and Event Sourcing – Use distributed messaging systems (e.g., Kafka, RabbitMQ) to ensure reliable communication between nodes and event sourcing to replay and rebuild state from event logs.
Flux.just(event)
    .flatMap(event -> kafkaProducer.send("topic", event))
    .subscribe();
  • Quorum-Based Consensus (e.g., Raft, Paxos) – Use quorum-based protocols to ensure a majority of nodes agree on updates before they are committed, balancing consistency and availability.
  • Partitioning and Sharding – Partition data across nodes to distribute load and use consistent hashing to determine the responsible node for each partition.
int partition = computePartition(key);
return nodeMap.get(partition).get(key);
  • Leader Election and Coordination – Implement leader election using tools like Zookeeper or Consul to manage coordination and avoid conflicts.
ZookeeperLeaderElection election = new ZookeeperLeaderElection("path/to/election");
if (election.isLeader()) {
    // Perform leader duties
}
  • Circuit Breakers and Bulkheads – Use circuit breakers to prevent cascading failures by temporarily cutting off failing services and bulkheads to isolate failures.
circuitBreaker.run(() -> service.call())
    .onErrorResume(e -> fallback());
  • Consistency Guarantees with Databases – Use databases that offer tunable consistency (e.g., Cassandra, DynamoDB) to balance consistency and availability based on requirements.
  • Monitoring and Observability – Implement distributed tracing and metrics collection using tools like Prometheus, Jaeger, or Zipkin to monitor the health and performance of nodes.
meterRegistry.counter("node.requests").increment();
Tracing tracing = Tracing.newBuilder().build();
  • Reactive Streams and Backpressure – Use reactive streams with proper backpressure support to handle bursts of traffic without overwhelming the system.
Flux.create(emitter -> loadData(emitter))
    .onBackpressureBuffer(1000)
    .subscribe(data -> process(data));
  • Replication and Failover – Implement data replication and automatic failover to ensure high availability even if some nodes fail.

Q-15). What are the different types of issues faced while implementing R2DBC transaction management?

A-15). Overcoming limitations of reactive drivers, such as connection pooling issues or transaction management in R2DBC (Reactive Relational Database Connectivity), is essential for building robust and efficient reactive applications that interact with relational databases.

Common Challenges and Strategies
  • Connection Pooling IssuesUse a Reactive Connection Pool: Utilize reactive-aware connection pools such as r2dbc-pool or HikariCP in its reactive configuration.
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:mysql://localhost:3306/mydb");

Configure Connection Limits: Configure the maximum pool size and other parameters to balance performance and resource usage.

// Example configuration for r2dbc-pool
PoolingConnectionFactoryProvider.builder()
    .maxSize(20)
    .initialSize(5)
    .build();
  • Transaction Management – Use R2DBC’s programmatic transaction support to manage transactions explicitly within your reactive flows. Use transactional operators provided by R2DBC or higher-level frameworks like Spring Data R2DBC. Leverage Spring’s @Transactional annotation with reactive transaction managers for easier management.

Example – Programmatic Transaction Management

connectionFactory.create()
    .flatMap(connection -> connection.beginTransaction()
        .thenMany(connection.createStatement("INSERT INTO ...").execute())
        .flatMap(result -> result.getRowsUpdated())
        .concatWith(connection.commitTransaction())
        .onErrorResume(ex -> connection.rollbackTransaction()))
    .subscribe();

Example – Transactional Operators

databaseClient.inTransaction(tx -> tx.execute("INSERT INTO ...").then(tx.commitTransaction()))
    .onErrorResume(tx::rollbackTransaction)
    .subscribe();

Example – Spring Transaction Management

@Transactional
public Mono<Void> transactionalOperation() {
    return repository.save(newEntity);
}
  • Error Handling in Reactive Flows – Use operators like onErrorResume(), onErrorContinue(), or doOnError() to manage errors at strategic points in the reactive flow. Implement retries for transient failures and circuit breakers to prevent cascading failures.

Example – Centralized Error Handling

repository.save(entity)
    .doOnError(e -> log.error("Error saving entity", e))
    .onErrorResume(e -> Mono.empty())
    .subscribe();

Example – Retries and Circuit Breakers

repository.save(entity)
    .retry(3)
    .doOnError(e -> log.error("Retries failed", e))
    .subscribe();
  • Blocking Operations in Reactive Streams – Offload blocking operations to bounded elastic schedulers or separate thread pools. Where possible, convert blocking APIs to their reactive counterparts to fit the non-blocking model.

Example – Offload Blocking Operations

Mono.fromCallable(() -> blockingOperation())
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe();

Example – Convert Blocking to Reactive

Mono.defer(() -> Mono.just(blockingOperation()))
    .subscribe();
  • Backpressure Handling – Use operators like onBackpressureBuffer(), onBackpressureDrop(), or onBackpressureLatest() to handle backpressure scenarios. Use pagination or chunking strategies to limit the size of data retrieved or processed at once.

Example – Control Backpressure with Operators

repository.findAll()
    .onBackpressureBuffer(100)
    .subscribe();

Example – Limit Query Size

repository.findWithPagination(PageRequest.of(0, 100))
    .subscribe();
  • Monitoring and Debugging – Use tools like Reactor Debug Agent to help trace reactive pipelines and identify issues. Integrate reactive systems with logging frameworks (e.g., SLF4J) and monitoring tools (e.g., Micrometer) to track performance and identify bottlenecks.

Example – Reactive Debugging Tools

Hooks.onOperatorDebug();

Example – Metrics and Logs

meterRegistry.counter("db.connections.active").increment();

Q-16). How to manage distributed transactions across reactive services without using traditional JTA based solutions?

A-16). Managing distributed transactions across reactive services without using traditional JTA (Java Transaction API)-based solutions can be challenging due to the lack of inherent support for distributed two-phase commit (2PC) in reactive paradigms. However, there are several patterns and strategies that can be used to achieve consistency across distributed services in a reactive, non-blocking manner.

Strategies for Managing Distributed Transactions in Reactive Systems
  • Saga Pattern – The Saga pattern decomposes a distributed transaction into a series of local transactions. Each local transaction has a corresponding compensating transaction to undo the work in case of a failure.
public Mono<Void> processOrderSaga(Order order) {
    return orderService.createOrder(order)
        .then(paymentService.processPayment(order.getPaymentDetails()))
        .then(shippingService.scheduleShipping(order))
        .onErrorResume(ex -> compensateOrder(order));
}

private Mono<Void> compensateOrder(Order order) {
    return shippingService.cancelShipping(order)
        .then(paymentService.refundPayment(order.getPaymentDetails()))
        .then(orderService.cancelOrder(order));
}
  • Event Sourcing and CQRSEvent Sourcing captures all changes to the state as a sequence of events. The CQRS (Command Query Responsibility Segregation) pattern separates the read and write models.
public Mono<Void> handleOrderEvent(OrderEvent event) {
    return eventStore.appendEvent(event)
        .flatMap(e -> applyEventToState(e))
        .onErrorResume(ex -> rollbackEvent(event));
}

private Mono<Void> rollbackEvent(OrderEvent event) {
    return eventStore.appendEvent(event.getCompensatingEvent());
}
  • TCC (Try-Confirm/Cancel) – The TCC (Try-Confirm/Cancel) pattern involves three phases: Try (prepare the resource), Confirm (commit the transaction), and Cancel (rollback if needed).
public Mono<Void> executeTccTransaction(TccTransaction transaction) {
    return tryPhase(transaction)
        .flatMap(result -> confirmPhase(transaction))
        .onErrorResume(ex -> cancelPhase(transaction));
}

private Mono<Void> tryPhase(TccTransaction transaction) {
    return Flux.fromIterable(transaction.getParticipants())
        .flatMap(participant -> participant.tryOperation())
        .then();
}

private Mono<Void> confirmPhase(TccTransaction transaction) {
    return Flux.fromIterable(transaction.getParticipants())
        .flatMap(participant -> participant.confirmOperation())
        .then();
}

private Mono<Void> cancelPhase(TccTransaction transaction) {
    return Flux.fromIterable(transaction.getParticipants())
        .flatMap(participant -> participant.cancelOperation())
        .then();
}
  • Compensating Transactions – Instead of rolling back transactions, compensating transactions are used to undo the effects of the operations that have already been committed in case of failure.
public Mono<Void> handleFailure(Order order) {
    return paymentService.refundPayment(order.getPaymentDetails())
        .then(shippingService.cancelShipping(order))
        .then(orderService.cancelOrder(order));
}