Core Java – Flow.Processor
Reactive streams address a fundamental problem of modern systems: Producers (sensors, services, user events) deliver data at an unpredictable rate, while consumers (persistence, UI, analytics) can only process data at a limited speed. Without a flow control model , backlogs, storage pressure, and ultimately outages occur.
With Java, java.util.concurrent.Flow provides a minimalist API that standardises this problem: Publisher → Processor → Subscriber, including backpressure. Flow.Processor<I,O> is the hinge between upstream and downstream: process, transform, buffer, throttle, and at the same time correctly respect the demand.
Abstract: Reactive Streams = asynchronous push model with backpressure. Java Streams = synchronous PullModel without backpressure.
1.1 Why Reactive Streams?
- Controlled load (backpressure): Consumers actively signal how many items they can process (request(s)) by requesting the exact number of items that they can currently process safely via their subscription. The publisher may only deliver as many items as previously requested, maintaining a controlled processing speed, ensuring buffers do not overflow, and keeping memory consumption predictable. The demand can be adjusted dynamically (e.g. batch request(32) or item-wise request(1)). In case of overload, the consumer can pause or cancel cleanly by calling cancel().
- Asynchrony & Decoupling: Data flows can cross thread and service boundaries. This means that a publisher is operated in its own thread or via an executor, for example, while the subscriber processes in a different context. In between, there can be queues or network boundaries, for example, when events are sent from one microservice to another. Loose coupling allows systems to scale and distribute without processing being tightly tied to a single execution context.
- Infinite/long flows: Telemetry, logs, and EventStreams are often continuous, potentially infinite data sources. A single pull, as is standard in classic Java streams, is insufficient here because the data is not available at a fixed time, but rather constantly accumulates. Instead, you need a permanent subscription that continuously informs the consumer about new events. This is the only way to process continuous data streams without the consumer having to poll or repeatedly start new streams actively.
- Error semantics & completion: Consistent signals such as onError and onComplete ensure that each data pipeline receives a well-defined closure or transparent error handling. Instead of unpredictable exceptions that occur somewhere in the code, there are standardised callback methods that clearly mark the lifetime of a stream. This enables the reliable distinction between a data stream that has been terminated regularly and one that has encountered an error, allowing downstream components to react accordingly, such as through cleanup, retry mechanisms, or logging. This predictability is crucial for robust pipelines that are to run stably over more extended periods of time.
- Composability: Composability plays a significant role: A processor can be used as a transformation, for example, by modifying incoming data according to specific rules – similar to a map or filter operation in Java streams. In addition, control operators can be implemented, such as a throttle that limits the rate or a debounce, which only passes on the last signal of a short period of time. Finally, a processor also enables the construction of more complex topologies, such as FanIn, where several sources are merged, or FanOut, where a data stream is distributed among multiple subscribers. This allows entire processing pipelines to be assembled in a modular manner without the individual components having to know their internal logic.
- Operation & Security: Backpressure is not only used for load control, but also acts as effective protection against denial‑of service attacks. A system that writes data to infinite buffers without restraint can easily be paralysed by excessive input. With correctly implemented backpressure, on the other hand, every demand is strictly enforced: The publisher only delivers as many elements as were actually requested. This prevents malicious or faulty sources from overloading the system by producing data en masse. Instead, resource consumption remains predictable and controlled, increasing the stability and security of the entire pipeline.
Where does that fit in the JDK?
- Namespace: java.util.concurrent.Flow – this package bundles the core building blocks of the reactive API. It provides interfaces for publishers, subscribers, subscriptions, and processors, and thus forms the standard framework for reactive streams in the JDK. This ensures that all implementations adhere to the same contracts and can be seamlessly integrated.
- Roles: The roles can be distinguished as follows: A publisher creates and delivers data to its subscribers, a subscriber consumes this data and processes it further, the subscription forms the contract between the two and regulates the backpressure signals, among other things, and the processor takes on a dual role by receiving data as a subscriber and at the same time passing on transformed or filtered data to the next consumer as a publisher.
- The types in the Flow API are generic and require careful and thread-safe handling. Generic means that Publisher, Subscriber, and Processor are always instantiated with specific type variables, so that the data type remains consistent throughout the pipeline. At the same time, strict attention must be paid to thread safety during implementation, as signals such as onNext or onError can arrive from different threads and must be synchronised correctly. As an introduction, the JDK provides a reference implementation with the SubmissionPublisher, which is sufficient for simple scenarios and already takes into account essential concepts such as backpressure. In production systems, however, developers often fall back on their own or adapted processor classes to precisely implement specific requirements for buffering, transformation, error handling or performance.
1.2 Push vs. Pull: Reactive Streams vs. Java Streams
| Aspect | Java Streams (Pull) | Reactive Streams / Flow (Push) |
| Data direction | Consumer pulls data | Producer pushes data |
| Execution | type. synchronously in the calling thread | asynchronous, often via executor/threads |
| Backpressure | non-existent | central concept (request(s)) |
| Lifetime | finite, terminal operation | Potentially infinite, subscription-based |
| Error | Exceptions in the caller | onError‑signal in the stream |
| Demolition | End of Source | cancel() of the subscription |
Minimal examples
Pull (Java Streams) – the consumer sets the pace:
Note: SubmissionPublisher is a convenient way to get started, but not a panacea (limited tuning options). Custom processor‑implementations give you full control over backpressure, buffer, and concurrency.
@Test
void test001() {
var data = List.of(1, 2, 3, 4, 5);
int sum = data.stream()
.filter(n -> n % 2 == 1)
.mapToInt(n -> n * n)
.sum();
System.out.println("sum = " + sum);
}
//Push (flow) – the producer delivers, the subscriber signals demand:
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
@Test
void test002() {
class SumSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscriptions;
private int sum = 0;
@Override public void onSubscribe(Flow.Subscription s) { this.s = s; s.request(1); }
@Override public void onNext(Integer item) { sum += item; s.request(1); }
@Override public void onError(Throwable t) { t.printStackTrace(); }
@Override public void onComplete() { System.out.println("sum = " + sum); }
}
try (var pub = new SubmissionPublisher<Integer>()) {
pub.subscribe(new SumSubscriber());
for (int i = 1; i <= 5; i++) pub.submit(i); asynchronous push
}
}
1.3 Typical Use Cases
- Telemetry & Logging: In modern systems, events are continuously created, such as metrics or log entries. These streams can be so high frequency that they cannot be permanently stored or transmitted in their raw form. This is where a processor comes into play, which first caches the events and then combines them in batches, for example, 100 events each. In this way, the flood of data can be effectively throttled and persisted in manageable portions, preventing overloading of individual components.
- Message ingestion: In many architectures, message and event systems, such as Kafka, classic message queues, or HTTP endpoints, form the input layer for data. These can be modelled as publishers who continuously provide new events. A processor then takes over the validation and possible enrichment, such as checking mandatory fields, adding additional metadata or transforming them into a uniform format. Only then does the enriched and checked data pass on to the subscriber, who takes over the actual persistence, for example, by writing it to a database or data warehouse. In this way, a clear separation of responsibility is achieved, allowing errors to be detected and addressed early in the data stream.
- IoT/streams from sensors: In practice, sensors often deliver data streams at highly fluctuating speeds. Some sensors transmit data in millisecond intervals, while others transmit data only every few seconds or minutes. If these flows are passed on unchecked to a central consumer, there is a risk of buffer overflow and thus instability in the overall system. With backpressure, however, the consumer can actively control how many measured values they take. In scenarios where the rate exceeds the processing capacity, additional strategies can be employed: either older values are discarded (DropStrategy) or only the most recent value is passed on (LatestStrategy). This keeps the system stable, allowing the consumer to always work with up-to-date and relevant data without being overwhelmed by data avalanches.
- UIEvents & RateControl: In user interfaces, event streams with very high frequencies quickly arise, for example, when a user types a search query and an event is triggered for every keystroke. Without regulation, these events would be passed on to the BackendService unhindered, which could overload both the network connections and the servers. In this case, a special DebounceProcessor ensures that only the last event is forwarded within a short period of time. This avoids unnecessary inquiries, and the user still receives up-to-date suggestions promptly. This technique effectively prevents the UI from being flooded with data and, at the same time improves the perceived performance and responsiveness of the application.
- Security: For safety-critical applications, it is imperative that each stage of a data pipeline is clearly delineated and only communicates via defined contracts. This prevents the creation of an unbridled shared state that could lead to inconsistencies or security gaps. Another aspect is the possibility of interrupting data flows cleanly at any time. A cancel() method allows a subscriber to terminate the subscription if they determine that the source is faulty, provides too much data, or presents a potential security threat. In this way, resources can be freed up immediately, and the pipeline remains stable and resilient even under adverse conditions.
2. Overview: java.util.concurrent.Flow
After explaining the motivation and the basic differentiation from Java Streams in the first chapter, we will now deal with the actual framework that the JDK provides for reactive streams. With the java.util.concurrent.The Flow package, a small but powerful API, has been provided since Java 9, describing the essential roles and contracts of Reactive Streams. These interfaces are deliberately kept minimal, allowing them to be used directly for simple use cases as well as forming the basis for more complex libraries.
The focus is on four roles that define the entire lifecycle of publishers and subscribers: Publisher, Subscriber, Subscription and Processor.
A publisher is the source of the data. He produces elements and makes them accessible to his subscribers. He may only send data if it has been explicitly requested beforehand. This prevents him from sending events uncontrollably, which in the worst case, fizzle out into nowhere or cause memory problems.
A subscriber is the counterpart to this: they receive the data that the publisher provides and process it further. To avoid being overloaded, the subscriber informs the publisher of the number of items they can include at a given time as part of the subscription.
The subscription is the link between the publisher and the subscriber. It is created at the moment of registration (the so-called subscribe process) and handed over to the subscriber. From then on, it controls the flow of data by providing methods such as request(long n) and cancel(). This controls demand and, simultaneously, enables a clean interruption of data transmission.
Finally, a processor combines both roles: it is a subscriber itself because it receives data from a publisher, but at the same time, it is also a publisher because it passes on the transformed or filtered data. This dual function is the great strength of the processor: it enables the construction of complex pipelines from simple building blocks, where transformation, filtering, or aggregation are clearly separated and encapsulated.
2.1 Life cycle of the signals
The flow within the Flow API follows a clear order. As soon as a subscriber logs in to a publisher, the publisher first calls the subscriber’s onSubscribe method and passes the subscription. Only when the subscriber requests elements via this subscription – such as request(1) or request(10) – does the publisher begin to deliver data. Each delivered item is delivered by a call to onNext.
If all data has been sent or the stream is terminated for other reasons, the publisher signals the end with onComplete. If, on the other hand, an error occurs, it is reported via onError . This precise signal semantics ensures that every data flow is either completed regularly or terminated with an error message. A subscriber, therefore, always knows exactly what state the data source is in and can react accordingly.
2.2 Backpressure basic principle
A central concept of Flow is the so-called backpressure. It prevents publishers from flooding their subscribers with data. Instead, the subscribers themselves control how many items they want to retrieve. For example, a subscriber can initially request only a single element with request(1) and only ask for the next one after it has been processed. Similarly, it is possible to order larger quantities, such as request(50), to increase throughput. This model gives consumers complete control and ensures a stable balance between production and consumption.
Backpressure is not only a technical detail, but a decisive criterion for the robustness of reactive systems. Without this control, publishers could deliver events unchecked, overloading resources such as CPU or memory. Backpressure, on the other hand, can also be used to implement complex scenarios such as batching, prioritisation or flow control across thread and system boundaries.
The Flow API in the JDK defines the elementary building blocks of a reactive pipeline. Publisher, subscriber, subscription and processor together form a clear model that supports both small experiments and upscaled systems. The signal lifecycle and backpressure mechanisms ensure that the data streams are controlled, traceable and stable. This lays the foundation for delving deeper into the role of the processor in the following chapters and shedding light on its concrete applications.
3. Focus on Flow.Processor<I,O>
After the four roles were briefly described in the previous chapter, the focus now shifts to the processor. It occupies a key position in the flow API because it fulfils two roles simultaneously: it is a subscriber to its upstream, i.e., it receives data from a publisher, and it is also a publisher to its downstream, i.e., it passes on transformed or filtered data to other subscribers. This makes it act like a hinge in a chain of processing steps.
A processor is always provided with two type parameters: <I, O>. The first type describes which elements it receives from the upstream, and the second type describes which elements it passes on to the downstream. This allows any transformations to be mapped – for example, from raw sensor data to validated measured values or from text messages to structured objects.
3.1 Contract & Responsibilities
Implementing a processor means that you must abide by the rules of both subscribers and publishers. This includes, in particular, the following aspects:
- Pay attention to signal flow: The processor must handle all signals it receives from the upstream device wholly and correctly. These include the onSubscribe, onNext, onError, and onComplete methods. Each of these signals fulfils a particular function in the lifecycle of a data stream: onSubscribe initiates the subscription and hands over control of the data flow, onNext signals the actual data, onError signals the occurrence of an error, and onComplete marks the regular end of the stream. It is essential that the processor strictly adheres to these semantics: An error may only be reported once and unambiguously, and no further data may follow after the onComplete event occurs. Only through these clear rules does processing remain consistent, predictable and reliable for all components involved.
- Respect backpressure: As a subscriber, the processor is obligated to adhere strictly to the rules of the backpressure model. This means that it may only receive and process exactly as many elements as it has previously actively requested from the upstream via its subscription – for example, by making a call such as request(1) or request(10). This prevents him from being overrun by a publisher who is too fast. At the same time, as a publisher, he has the responsibility to pass on this logic unchanged to his downstream. He is therefore not allowed to forward more elements to his subscribers than they have explicitly requested. The processor thus acts as a pass-through for the demand signals, ensuring that the entire chain, from publisher to processor to subscriber, remains in balance and that there are no overloads or data losses.
- Manage resources cleanly: A processor must also ensure that the resources it manages can be released cleanly at all times. In particular, this includes the fact that subscriptions can be actively terminated, for example, by calling cancel() if a subscriber loses interest or processing is to be interrupted for other reasons. It is equally vital that there are no memory or thread leaks: Open queues must be emptied or closed, background threads must be properly terminated, and executor services must be shut down again. Only through this consistent management can the system remain stable, performant and free of creeping resource damage in the long term.
- Do not swallow errors: If an error occurs, it must be consistently and transparently passed on to the downstream system via the onError method. This ensures that downstream subscribers are also clearly informed about the status of the data stream and can react – for example, through logging, retry mechanisms or the targeted termination of their own processing. If, on the other hand, an error is tacitly ignored, there is a risk of inconsistencies that are difficult to understand. Equally problematic is the uncontrolled throwing of unchecked exceptions, as they are not caught cleanly in the context of reactive pipelines; thus, entire processing chains can enter an undefined state. Clean error propagation is therefore a central quality feature of every processor implementation.
Type Variables & Invariants
The type parameters can be used to determine precisely what kind of data a processor can process. For example, a processor<string, integer> could receive lines of text and extract numbers from them, which it then passes on. These types must be strictly adhered to a subscriber who expects integers must not receive strings unexpectedly. The genericity in the Flow API ensures that errors of this kind are already noticeable during compilation.
Additionally, the invariant holds that a processor must behave like both a correct subscriber and a correct publisher. He is therefore not only responsible for the transformation, but also for the proper transfer of tax information, such as demand and cancellation.
Chain position: between upstream & downstream
In practice, a processor is rarely used in isolation, but as an intermediate link in a chain. An upstream publisher delivers data that the processor receives, transforms or filters and then forwards to downstream subscribers. This creates flexible pipelines that can be expanded or exchanged depending on requirements.
For example, a logging processor can be placed between a data source and the actual consumer, also to record all elements. A processor can also be used to buffer data before it is passed on in larger batches. The position in the middle allows different aspects, such as transformation, control and observation, to be separated from each other and made modular.
The Flow.Processor is the link in reactive pipelines. Through his dual function as subscriber and publisher, he takes responsibility for the correct processing of signals, the implementation of transformations and compliance with the backpressure rules. Its type parameters <I,O> also ensure that data flows remain strictly typed and thus reliable. In the following chapters, we will show how to implement your own processor classes and which patterns have proven themselves in practice.
4. Practical introduction with the JDK
Now that the concepts and contracts around the Flow.Processor have been presented in detail, it is time to start with a practical implementation in the JDK. Fortunately, Java has provided a reference implementation for a publisher since version 9, with the SubmissionPublisher, which is ideal for initial experiments. It relieves you of many details, such as thread management and internal buffering, allowing you to concentrate fully on the data flow.
The SubmissionPublisher is designed to distribute data asynchronously to its subscribers. In the background, it works with an executor that packages and distributes the individual signals, such as onNext, onError and onComplete, into separate tasks. By default, it uses the ForkJoinPool.commonPool(), but it can also pass its own executor. For small experiments, the standard configuration is usually sufficient; however, in production scenarios, it is worthwhile to adapt the executor and buffer sizes to your specific requirements.
4.1 SubmissionPublisher in a nutshell
The SubmissionPublisher<T> class implements the Publisher<T> interface and is therefore an immediately usable source of data. It provides the submit(T item) method, which is used to add new items to the stream. These items are then automatically distributed to all registered subscribers. In this way, the publisher assumes the role of an asynchronous dispatcher, passing incoming data to any number of subscribers.
An essential detail is that the submit method does not block, but buffers the elements internally and then distributes them. However, if the buffer limits are reached, submit can block or even throw an IllegalStateException if the system is under too much load. That’s why it’s crucial to find the right balance between publisher speed, buffer size, and subscriber demand.
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
@Test
void test003() {
class PrintSubscriber
implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Register: 1 element/req" );
this.subscription = subscription;
subscription.request(1); Request first element
}
@Override
public void onNext(Integer item) {
System.out.println("Receive: " + item);
subscription.request(1); request another after each item
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
}
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(new PrintSubscriber());
for (int i = 1; i <= 5; i++) {
System.out.println(" send element = " + i );
publisher.submit(i);
}
}
}
4.2 Simple pipeline without its own processor
To understand how SubmissionPublisher works, it’s worth taking a simple example. This creates a publisher that sends numbers from 1 to 5 to a subscriber. The subscriber receives the data and outputs it to the console. This example shows the basic flow of onSubscribe, onNext, and onComplete.
In this program, the publisher asynchronously generates five values, which are transmitted to the subscriber one after the other. The subscriber explicitly requests an additional element each time, creating a controlled and stable flow of data. Finally, the publisher signals with onComplete that no more data will follow.
4.3 Limitations of SubmissionPublisher
Even though the SubmissionPublisher is very helpful for getting started, it quickly reaches its limits in more complex scenarios. For example, it offers only limited possibilities for configuring the backpressure behaviour. While the buffer size is customizable, it doesn’t directly support more complex strategies, such as dropping or latest-value. Additionally, the standard executor is not suitable for all applications, particularly when high latency requirements or strict thread isolation are necessary.
Another point is that SubmissionPublisher is primarily intended for learning purposes and simple applications. In productive systems, people usually rely on their own publisher and processor implementations or on established reactive frameworks such as Project Reactor or RxJava, which are based on the Flow API and provide additional operators.
The SubmissionPublisher is a handy place to start to see the concepts of the Flow API in action. It shows how publishers and subscribers interact, how backpressure works and how signals are processed. At the same time, however, it also becomes clear that for more complex or productive scenarios, in-house processor implementations or external libraries are indispensable. In this way, he forms the bridge between the theoretical foundations and the first practical steps in working with reactive pipelines in the JDK.
5. Sample Processor with Code Examples
Now that the foundations for your own implementation of a processor have been laid, it makes sense to look at typical patterns. They show how concrete use cases can be implemented and form the basis for more complex pipelines. Here are some simple but commonly used processor types, each with sample code.
5.1 MapProcessor<I,O> Transformation
The MapProcessor is the simplest and at the same time one of the most useful processors. It takes on the role of a transformation: incoming data is changed with a function and then passed on. For example, strings can be converted to their length or numbers can be squared.
@Test
void mapProcessor_usage_minimal()
throws Exception {
final CountDownLatch done = new CountDownLatch(1);
final Function<String, Integer> mapper = s -> Integer.parseInt(s) * 2;
//1) Publisher (source)
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//2) MapProcessor (String -> Integer)
MapProcessor<String, Integer> map = new MapProcessor<>(mapper)) {
//3) Target Subscribers (Spend Only)
map.subscribe(
new Flow.Subscriber<>() {
private Flow.Subscriptions;
@Override
public void onSubscribe(Flow.Subscription subscription) { this.s = subscription; s.request(1); }
@Override
public void onNext(Integer item) { out.println("receive = " + item); s.request(1); }
@Override
public void onError(Throwable t) { t.printStackTrace(); done.countDown(); }
@Override
public void onComplete() { done.countDown(); }
});
//Concatenation: Publisher -> MapProcessor
publisher.subscribe(map);
//Send data
publisher.submit("1");
publisher.submit("2");
publisher.submit("3");
publisher.close();
//signal end Wait briefly for completion (asynchronous)
done.await();
}
}
/**
* Minimal processor that applies a Function<I,O>.
* No own threads/executor – uses the defaults of the SubmissionPublisher.
*/
static class MapProcessor<I, O>
extends SubmissionPublisher<O>
implements Flow.Processor<I, O> {
private final Function<I, O> mapper;
private Flow.Subscription subscription;
MapProcessor(Function<I, O> mapper) {
super();
this.mapper = mapper;
}
@Override
public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); }
@Override
public void onNext(I item) { submit(mapper.apply(item)); subscription.request(1); }
@Override
public void onError(Throwable throwable) { closeExceptionally(throwable); }
@Override
public void onComplete() { close(); }
}
This allows data streams to be elegantly transformed without adjusting the rest of the pipeline.
5.2 FilterProcessor<T> – Filtering Data
Another typical pattern is filtering. Only those elements that meet a certain condition are passed on. This is especially helpful when large amounts of data are generated, of which only a fraction is relevant.
@Test
void filterEvenNumbers()
throws Exception {
Predicate<Integer> even = n -> {
out.println("Filter: " + n);
return n % 2 == 0;
};
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
FilterProcessor<Integer> filter = new FilterProcessor<>(even);
CountDownLatch done = new CountDownLatch(1);
CopyOnWriteArrayList<Integer> received = new CopyOnWriteArrayList<>();
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription sub;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.sub = subscription;
sub.request(1);
}
@Override
public void onNext(Integer item) {
received.add(item);
sub.request(1);
}
@Override
public void onError(Throwable throwable) { done.countDown(); }
@Override
public void onComplete() { done.countDown(); }
};
publisher.subscribe(filter);
filter.subscribe(subscriber);
List.of(1, 2, 3, 4, 5, 6).forEach(publisher::submit);
publisher.close();
//Short waiting time for asynchronous processing
if (!done.await(1, TimeUnit.SECONDS)) {
throw new TimeoutException("Processing did not finish in time");
}
out.println("received = " + received);
assertEquals(List.of(2, 4, 6), received);
}
}
static class FilterProcessor<T>
extends SubmissionPublisher<T>
implements Flow.Processor<T, T> {
private final Predicate<T> predicate;
private Flow.Subscription subscription;
public FilterProcessor(Predicate<T> predicate) { this.predicate = predicate; }
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
if (predicate.test(item)) { submit(item); }
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { closeExceptionally(throwable); }
@Override
public void onComplete() { close(); }
}
With this processor, data streams can be reduced in a targeted manner and made more relevant.
5.3 BatchingProcessor<T> – Collect and Share
Sometimes it doesn’t make sense to pass on every single element right away. Instead, they want to collect data and pass it on in blocks. The BatchingProcessor does exactly this job.
This processor is useful for processing data efficiently in blocks, for example when writing to a database.
import java.util.*;
@Test
void testBatch()
throws InterruptedException {
List<List<Integer>> received = new ArrayList<>();
CountDownLatch done = new CountDownLatch(1);
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
BatchingProcessor<Integer> batching = new BatchingProcessor<>(3)) {
publisher.subscribe(batching);
batching.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(List<Integer> batch) {
System.out.println("onNext - batch = " + batch);
received.add(batch);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
done.countDown();
throw new AssertionError("Unexpected error", t);
}
@Override
public void onComplete() {
System.out.println("onComplete...");
done.countDown();
}
});
for (int i = 1; i <= 10; i++) {
System.out.println("submitting i = " + i);
publisher.submit(i);
}
publisher.close();
done.await();
}
assertThat(received)
.containsExactly(
List.of(1, 2, 3),
List.of(4, 5, 6),
List.of(7, 8, 9),
List.of(10)
);
}
class BatchingProcessor<T>
extends SubmissionPublisher<List<T>>
implements Flow.Processor<T, List<T>> {
private final List<T> buffer = new ArrayList<>();
private final int batchSize;
private Flow.Subscription subscription;
public BatchingProcessor(int batchSize) { this.batchSize = batchSize; }
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
buffer.add(item);
if (buffer.size() >= batchSize) {
submit(new ArrayList<>(buffer));
buffer.clear();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { closeExceptionally(throwable); }
@Override
public void onComplete() {
if (!buffer.isEmpty()) {
submit(new ArrayList<>(buffer));
buffer.clear();
}
close();
}
}
5.4 ThrottleProcessor<T> Throttling
Especially with high-frequency sources, it is necessary to limit the rate of data passed on. A ThrottleProcessor can be implemented in such a way that it only forwards one element at certain time intervals and discards the rest.
@Test
void throttle_allows_items_only_in_interval()
throws Exception {
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
ThrottleProcessor<Integer> throttle = new ThrottleProcessor<>(50)) { // 50 ms interval
CollectingSubscriber<Integer> subscriber = new CollectingSubscriber<>(3); we expect 3 elements
//Build a pipeline: publisher -> throttle -> subscriber
publisher.subscribe(throttle);
throttle.subscribe(subscriber);
//Send Items: 1 (should through), 2/3 (throttled), 4 (through), 5 (throttled), 6 (through)
publisher.submit(1);
Thread.sleep(10);
publisher.submit(2);
Thread.sleep(10);
publisher.submit(3);
Thread.sleep(60); //> 50ms: Next may go through
publisher.submit(4);
Thread.sleep(10);
publisher.submit(5);
Thread.sleep(60);
publisher.submit(6);
publisher.close(); //Completion of the source
assertTrue(subscriber.await(2, TimeUnit.SECONDS),
"Timeout while waiting for throttled items");
var received = subscriber.getReceived();
logger().info("Received {} items", received);
assertEquals(List.of(1, 4, 6), received);
}
}
//---- Processor ----
static class ThrottleProcessor<T>
extends SubmissionPublisher<T>
implements Flow.Processor<T, T>, HasLogger {
private final long intervalMillis;
private Flow.Subscription subscription;
private long lastEmission = 0;
public ThrottleProcessor(long intervalMillis) {
this.intervalMillis = intervalMillis;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
logger().info("onNext {}", item);
long now = System.currentTimeMillis();
if (now - lastEmission >= intervalMillis) {
logger().info("submit item " + item);
submit(item);
lastEmission = now;
} else {
logger().info("< intervalMillis - skipping item {}", item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { closeExceptionally(throwable); }
@Override
public void onComplete() { close(); }
}
//---- simple collector-subscriber ----
static class CollectingSubscriber<T>
implements Flow.Subscriber<T> , HasLogger {
private final List<T> received = new ArrayList<>();
private final CountDownLatch latch;
private Flow.Subscription subscription;
CollectingSubscriber(int expectedCount) {
this.latch = new CountDownLatch(expectedCount);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE); unbounded demand
}
@Override
public void onNext(T item) {
logger().info("onNext {}", item);
received.add(item);
latch.countDown();
}
@Override
public void onError(Throwable throwable) { /* no-op for test */ }
@Override
public void onComplete() { /* no-op for test */ }
List<T> getReceived() { return received; }
boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return latch.await(timeout, unit);
}
}
This pattern can be used, for example, to prevent a UI event stream from generating too many updates uncontrollably.
The sample processors presented here are fundamental building blocks for designing reactive pipelines in the JDK. With transformation, filtering, batching and throttling, many typical requirements are covered. The following chapters will focus on implementing backpressure in practice and discuss which strategies have proven successful in operation.
6. Backpressure in practice
After presenting concrete sample processors in the previous chapter, we will now focus on one of the central topics in the Flow API: Backpressure. While the theory sounds simple – the subscriber determines the number of elements they can process – in practice, it turns out that the right implementation is crucial for stability, efficiency and robustness.
6.1 Strategies for dealing with backpressure
Backpressure prevents a publisher from flooding its subscribers with data. Nevertheless, there are different strategies for coping with demand:
- Bounded buffering: In this strategy, the publisher only keeps a clearly defined, limited number of elements in an internal buffer. Once this buffer is filled, there are several possible reactions: the submit() method can block until there is space again, or it throws an exception to indicate the overload. In this way, an uncontrolled growth of the memory is prevented. This model is particularly suitable in scenarios where a controlled and predictable data rate is required – for example, in systems that are only allowed to process a certain number of requests per second or in hardware interfaces with limited processing capacity.
- Dropping: When overloaded, new elements are deliberately discarded instead of being cached in an infinite buffer. This strategy makes sense, especially where not every single event is critical and the loss of individual data points remains tolerable. Typical examples are telemetry data, which is generated at a high frequency anyway, or UI events such as mouse movements or keystrokes, where only the current state is relevant for further processing. Dropping keeps the system responsive and resource-efficient even under extreme load, as it doesn’t try to funnel every single event through the pipeline at all costs.
- Latest-Value: In the Latest-Value strategy, old elements are not collected in a buffer, but are consistently discarded as soon as a new value arrives. Only the most recently delivered element is stored in each case, so that the subscriber only receives the latest version the next time it is retrieved or asked. This procedure is beneficial when values change continuously. Still, only the last state is relevant for processing – for example, in cases where sensor data, such as temperature or position coordinates, is involved. This relieves the subscriber because he does not have to work through all the intermediate results, but can continue working directly with the latest information.
- Blocking: In the blocking strategy, the publisher actively waits for demand to be signalled again by the subscriber before delivering further elements. In practice, this means that the calling thread blocks until a corresponding request is made. This approach is comparatively easy to implement and makes the process calculable at first glance, as it ensures that no more data is produced than was requested. However, this approach has a significant drawback: blocked threads are a scarce resource in highly concurrent systems, and when many publishers block at the same time, entire thread pools can become clogged. Therefore, blocking should only be used in very special scenarios, such as when the data rate is low anyway or when controlling the exact data flow is more important than maximum parallelism.
6.2 Demand Policy: Batch vs. Single Retrieval
Subscribers can determine for themselves how many items they call up at a time. There are two common approaches:
- Single retrieval (request(1)): After each item received, exactly one more is requested. This approach is simple and provides maximum control, but it generates a large number of signals and can become inefficient at very high rates.
- Batch retrieval (request(n)): The subscriber requests several elements at once, such as request(32). This reduces the signal load, allowing the publisher to deliver items in blocks more efficiently. However, the subscriber must then make sure that he can handle these batches.
In practice, both approaches are often combined, depending on whether low latency or high throughput is the primary concern.
6.3 Monitoring, Tuning and Capacity Planning
A decisive success factor in the use of backpressure is monitoring. Without measuring points, it remains unclear whether a system is running stably or is already working at its limits. The following key figures are helpful:
- Queue depth: The queue depth refers to the number of elements currently in the internal buffer that the subscriber has not yet retrieved. It is a direct indicator of whether the publisher is rushing away from the consumer or whether demand is in balance with production.
- Latency: Latency refers to the time elapsed between the creation of an element and its final processing by the subscriber. It is a key measure of the system’s responsiveness: high latency indicates that elements are jamming in buffers or processing steps are taking too long. In contrast, low latency means a smooth data flow.
- Throughput (items per second): Throughput indicates the number of items that can be processed per unit of time. It’s a measure of the entire pipeline’s performance: high throughput means that publishers, processors, and subscribers can work together efficiently and meet demand. If the throughput drops below the expected level, this is an indication of a bottleneck – whether due to too small buffers, too slow processing in the subscriber, or insufficient parallelisation. Throughput is therefore an important key figure for realistically estimating the capacity of a system and making targeted optimisations.
This data can be used to identify bottlenecks and take appropriate measures, such as adjusting the batch size, increasing the buffers, or introducing additional processor stages.
Concurrency & Execution Context
A central feature of the Flow API is its close integration with concurrent processing. Unlike classic Java streams, which usually run synchronously in the same thread, reactive streams almost always involve an asynchronous interaction of several threads. Therefore, it is crucial to understand the role of the execution context in detail.
7.1 Executor Strategies
The Flow API itself does not specify on which threads the signals are processed. Instead, the implementations determine which executor they use. The SubmissionPublisher uses the ForkJoinPool.commonPool() by default, but also allows you to include your own executor. This is extremely important in practice, as the choice of the executor can change the behaviour of the entire system:
- A CachedThreadPool can enable very high parallelism through its dynamic generation and reuse of threads. It grows indefinitely when there are many tasks at the same time, and reduces again when there is less work. This makes it exceptionally flexible and well-suited for unpredictable load peaks. However, this dynamic can also lead to uncontrollable behaviour: If thousands of threads are suddenly created, the context switching load increases significantly, which has an adverse effect on latency. It is therefore less suitable for latency-critical scenarios, as response times can become unpredictable due to the administrative overhead and the potentially high number of threads.
- A FixedThreadPool works with a fixed number of threads, which are defined when the pool is created. In this way, it creates predictable limits for concurrency and prevents uncontrolled growth of the thread count. This makes it particularly suitable for scenarios in which resources such as CPU cores or memory are to be used in a clearly limited and predictable manner. However, there is a disadvantage when overloaded: If there are more tasks than threads available, queues form. These lead to increasing latencies and can become problematic in critical systems if the queue grows unchecked. It is therefore essential to consciously dimension the pool and possibly use mechanisms such as backpressure or rejection policies to absorb overload situations in a controlled manner.
- The ForkJoinPool is optimised for fine-grained, recursive, and highly parallelizable tasks. He relies on a work-stealing procedure in which worker threads actively search for functions that are in the queues of other threads when they themselves have no more work. This achieves a high utilisation of the available threads and makes efficient use of computing time. This model excels above all in CPU-intensive calculations, which can be broken down into many small, independent tasks – for example, in divide-and-conquer algorithms or parallelised data analyses. The ForkJoinPool, on the other hand, is less suitable for blocking operations such as network access, file system or database queries. When a worker thread blocks, it becomes unavailable for work stealing, making the pool scarce, and the model’s efficiency suffers. In such cases, it is advisable to outsource blocking work to separate executors with a suitable thread policy or to rely on the virtual threads introduced by Loom, which are more tolerant of blocking operations. Additionally, in specific scenarios, the use of ForkJoinPool is applicable. ManagedBlocker can help to mitigate the adverse effects of blocking operations by allowing the pool size to be temporarily extended.
Choosing the right executor is, therefore, a key architectural point that directly impacts throughput, latency, and stability.
7.2 Reentrancy traps and serialization of signals
Another critical issue is the question of how the onNext, onError and onComplete signals are delivered. The Reactive Streams specification specifies that these signals must be called sequentially and not simultaneously, so they must be serialised. Nevertheless, in practice, parallel calls arise due to your own implementations or clumsy synchronisation. This quickly leads to errors that are difficult to reproduce, the so-called “Heisenbugs”.
Therefore, a processor or subscriber must ensure that he is not confronted with onNext calls from several threads at the same time. This can be achieved through synchronisation, using queues or by delegation to an executor, which processes the signals one after the other.
7.3 Virtual Threads (Loom) vs. Reactive
With Project Loom, Java has introduced a new model: Virtual Threads. These enable the creation of millions of lightweight threads that efficiently support blocking operations. This blurs the classic line between blocking and non-blocking code. The question, therefore, arises: Do we still need reactive streams with complex backpressure at all?
The answer is: Yes, but differentiated. While virtual threads make it easier to handle multiple simultaneous operations, they do not automatically resolve the issue of uncontrolled data production. Even with Virtual Threads, a subscriber must be able to specify the number of elements they can process. Backpressure, therefore, remains an important concept. However, virtual threads can help simplify the implementation of subscriber logic, as blocking processing steps now scale much better.
Error & Completion Scenarios
In every reactive data stream, the question arises as to how to deal with errors and the regular closing. The Flow API provides clear semantics for this: Each stream ends either with a successful completion (onComplete) or with an error (onError). This simple but strict model ensures that a subscriber knows exactly what state the data flow is in at all times.
8.1 Semantics of onError and onComplete
The onError and onComplete methods mark the endpoint of a data stream. Once one of these methods has been called, no further data may be sent through onNext. This contract is crucial because it guarantees consistency: a subscriber can rest assured that they won’t have to process any new items at the end.
- onComplete means that all elements have been successfully submitted and the source has been exhausted. Examples include reading a file or playing back a finite list of messages.
- onError signals that an error occurred during processing. This error is passed on to the subscriber as an exception so that he can react in a targeted manner – for example, through logging, restarts, or emergency measures.
It is important to note that onError and onComplete are mutually exclusive. So it must never happen that a stream first breaks off with an error and then signals a regular end.
8.2 Restarts and Retry Mechanisms
In many use cases, it is not sufficient to simply terminate a stream at the first error. Instead, they want to try to resume processing. Classic examples are network requests or database access, which can fail temporarily.
There is no built-in retry functionality in the flow API itself, but it can be implemented at the processor level. For example, after an error, a processor could decide to restart the request or skip the faulty record. It is important to handle the state clearly: A new onSubscribe may only take place when a new data stream is started. For more complex scenarios, patterns such as circuit breaker or retry with exponential backoff are ideal.
A simple example could be a subscriber who raises a counter on the first error and retries the faulty operation a maximum of three times. Only if an error still occurs after these retries does it finally pass the error on to the downstream:
class RetrySubscriber<T> implements Flow.Subscriber<T> {
private final Flow.Subscriber<? super T> downstream;
private int retries = 0;
private static final int MAX_RETRIES = 3;
private Flow.Subscription subscription;
RetrySubscriber(Flow.Subscriber<? super T> downstream) {
this.downstream = downstream;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
downstream.onSubscribe(subscription);
subscription.request(1);
}
@Override
public void onNext(T item) {
downstream.onNext(item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
if (retries < MAX_RETRIES) {
retries++;
System.out.println("Retry " + retries + " after error: " + t.getMessage());
subscription.request(1); Trying to keep going
} else {
downstream.onError(t);
}
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
This simple pattern is of course greatly simplified, but it clarifies the principle of a repeated attempt in the event of temporary errors.
8.3 Idem potency and exactly-once processing
A central problem with restarts is determining whether operations can be carried out more than once. Many systems are only robust if their operations are idempotent – that is, they produce the same result when executed multiple times. An example is writing a record with a specific key to a database: even if this process is repeated, the final state remains the same.
If idempotency is absent, there is a risk of double processing, which can lead to incorrect results or inconsistencies. Therefore, it is good practice to prefer idempotent operations or to ensure that data is not processed multiple times through additional mechanisms, such as transaction IDs, deduplication, or exactly-once processing.
A simple example: Suppose a processor processes orders and passes each order with a unique ID to an OrderService, which stores it. Even if the same record passes through the processor twice, the ID ensures that it only exists once in the service. This means that the operation remains idempotent.
A small Java example with a processor and JUnit5Test might look like this:
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
@Test
void processor_enforces_idempotence_by_filtering_duplicates()
throws Exception {
OrderService svc = new OrderService();
try (SubmissionPublisher<Order> pub = new SubmissionPublisher<>()) {
DeduplicateProcessor<Order, String> dedup = new DeduplicateProcessor<>(Order::id);
pub.subscribe(dedup);
dedup.subscribe(new SinkSubscriber(svc));
pub.submit(new Order("order-1", "Order A"));
pub.submit(new Order("order-1", "Order A (Duplicate)"));
pub.submit(new Order("order-2", "Order B"));
pub.close();
Thread.sleep(100); asynchronous drain
}
assertEquals(2, svc.size());
assertNotNull(svc.get("order-1"));
assertNotNull(svc.get("order-2"));
}
//Domain
record Order(String id, String payload) { }
//Processor: filters duplicates based on a key (idempotency in the pipeline)
static class DeduplicateProcessor<T, K>
extends SubmissionPublisher<T>
implements Flow.Processor<T, T> , HasLogger {
private final Function<T, K> keyExtractor;
private final Set<K> seen = ConcurrentHashMap.newKeySet();
private Flow.Subscription subscription;
DeduplicateProcessor(Function<T, K> keyExtractor) { this.keyExtractor = keyExtractor; }
@Override
public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(T item) {
logger().info("Receive {}", item);
if (seen.add(keyExtractor.apply(item))) {
logger().info("no duplicate - submitting");
submit(item);
} else {
logger().info("Duplicate - ignore");
}
subscription.request(1);
}
@Override
public void onError(Throwable t) { closeExceptionally(t); }
@Override
public void onComplete() { close(); }
}
//Subscriber writes to a simple "DB"
static class OrderService {
private final Map<String, Order> store = new ConcurrentHashMap<>();
void save(Order o) { store.put(o.id(), o); }
Order get(String id) { return store.get(id); }
int size() { return store.size(); }
}
static class SinkSubscriber
implements Flow.Subscriber<Order>, HasLogger {
private final OrderService svc;
private Flow.Subscriptions;
SinkSubscriber(OrderService svc) { this.svc = svc; }
@Override
public void onSubscribe(Flow.Subscription s) {
this.s = s;
s.request(1);
}
@Override
public void onNext(Order item) {
logger().info("Save {}", item);
svc.save(item);
s.request(1);
}
@Override
public void onError(Throwable t) { }
@Override
public void onComplete() { }
}
The test shows that despite the publication of order-1 being repeated twice, only one instance is sent downstream, thanks to DeduplicateProcessor. The idempotency logic is thus in the processor, not in the subscriber.
Interoperability & Integration
A key feature of the Flow API is its openness to integration. The API itself is minimal and only defines the basic contracts for publisher, subscriber, subscription and processor. This allows it to be used both standalone and combined with other reactive libraries.
9.1 Integration with Reactive Frameworks
Many established reactive frameworks, such as Project Reactor, RxJava, or Akka Streams, are based on the same fundamental ideas as the Flow API and are closely tied to the official Reactive Streams specification. They offer either direct adapters or interfaces that can be used to integrate publishers, subscribers and, in particular, your own processor implementations. This means that self-developed building blocks can be easily embedded in complex ecosystems without being tied to a specific framework. A central example is the helper class FlowAdapters in the JDK, which provides conversion methods between java.util.concurrent.Flow and org.reactivestreams.Publisher. This enables seamless integration of your own JDK-based publishers or processors with Reactor or RxJava operators without breaking the contracts. In practice, this means that a processor implemented with Flow can be integrated directly into a Reactor flux or fed from an RxJava observable, allowing existing pipelines to be expanded or migrated step by step.
A simple example shows the use of FlowAdapters. This example can also be secured with a small JUnit5 test:
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.*;
import java.util.concurrent.Flow.*;
import java.util.concurrent.FlowAdapters;
public class FlowAdapterTest {
@Test
void testFlowAdapterIntegration() throws Exception {
try (SubmissionPublisher<String> jdkPublisher = new SubmissionPublisher<>()) {
// Flow -> Reactive Streams
org.reactivestreams.Publisher<String> rsPublisher = FlowAdapters.toPublisher(jdkPublisher);
//back to Flow
Publisher<String> flowPublisher = FlowAdapters.toFlowPublisher(rsPublisher);
List<String> results = Collections.synchronizedList(new ArrayList<>());
flowPublisher.subscribe(new Subscriber<>() {
private subscriptions;
@Override public void onSubscribe(Subscription s) { this.s = s; s.request(1); }
@Override public void onNext(String item) { results.add(item); s.request(1); }
@Override public void onError(Throwable t) { fail(t); }
@Override public void onComplete() { }
});
jdkPublisher.submit("Hello World");
jdkPublisher.close();
Thread.sleep(100); Wait asynchronously
assertEquals(List.of("Hello World"), results);
}
}
}
This test shows that a SubmissionPublisher can be successfully turned into a Reactive Streams publisher and back again via FlowAdapters. The message “Hello world” reaches the subscriber correctly at the end.
9.2 Use in classic applications
The Flow API can also be used sensibly in non-reactive applications. For example, data streams from a message queue or a websocket can be distributed to internal services via a SubmissionPublisher. Legacy systems that have previously relied on polling benefit from distributing and processing events in real-time. This enables modern, reactive architectures to be integrated into existing applications incrementally.
A simple example illustrates the connection with a legacy service:
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.Flow;
//Simulates an existing legacy service that was previously addressed via polling
class LegacyService {
public void process(String msg) {
System.out.println("LegacyService handles: " + msg);
}
}
//Subscriber who passes incoming events to the legacy service
class LegacyServiceSubscriber implements Flow.Subscriber<String> {
private Flow.Subscriptions;
private final LegacyService service;
LegacyServiceSubscriber(LegacyService service) { this.service = service; }
@Override public void onSubscribe(Flow.Subscription s) { this.s = s; s.request(1); }
@Override public void onNext(String item) { service.process(item); s.request(1); }
@Override public void onError(Throwable t) { t.printStackTrace(); }
@Override public void onComplete() { System.out.println("Stream completed"); }
}
public class LegacyIntegrationDemo {
public static void main(String[] args) {
LegacyService legacy = new LegacyService();
try (SubmissionPublisher<String> pub = new SubmissionPublisher<>()) {
pub.subscribe(new LegacyServiceSubscriber(legacy));
//Example: instead of polling, a MessageQueue delivers new messages
pub.submit("Message 1 from MQ");
pub.submit("Message 2 from MQ");
}
}
}
This small program simulates use in a classic application: Instead of polling, messages from an external source are immediately forwarded to the LegacyService , which can continue to work unchanged. The Flow API serves as a bridge between modern event processing and existing logic.
9.3 Bridge to Java Streams
A common question is how the Flow API compares to or even connects to classic Java Streams (the Stream API). While streams typically process finite amounts of data in the pull model, flow works with potentially infinite sequences in the push model. Nevertheless, a combination is possible: A processor could cache incoming data and feed it into a Java stream. Conversely, the results of a stream can be brought back into a reactive context via a publisher. This creates a bridge between batch-oriented and event-driven processing.
A small JUnit5 example demonstrates this bridge: Here, elements are transferred to a stream via a SubmissionPublisher and then summed.
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.*;
import java.util.*;
public class FlowToStreamTest {
@Test
void testFlowToStreamIntegration() throws Exception {
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
List<Integer> buffer = Collections.synchronizedList(new ArrayList<>());
//Subscriber collects data in a list
publisher.subscribe(new Flow.Subscriber<>() {
private Flow.Subscriptions;
@Override public void onSubscribe(Flow.Subscription s) { this.s = s; s.request(1); }
@Override public void onNext(Integer item) { buffer.add(item); s.request(1); }
@Override public void onError(Throwable t) { t.printStackTrace(); }
@Override public void onComplete() { }
});
Publish events
publisher.submit(1);
publisher.submit(2);
publisher.submit(3);
publisher.close();
//short pause to wait for asynchronous collection
Thread.sleep(100);
//Transition to Java Stream: Calculate Sum
int sum = buffer.stream().mapToInt(i -> i).sum();
assertEquals(6, sum);
}
}
}
In this example, the list acts as a buffer and allows the transition from the asynchronous push model (flow) to the synchronous pull model (stream). In this way, both worlds can be elegantly combined. However, the topic is far more extensive and will be examined in detail in a separate article.
Result
The Flow API in java.util.Concurrent shows how Java provides a clean, standardised basis for reactive data streams. While Java Streams are intended for finite, synchronous pull scenarios, Flow addresses the challenges of continuous, infinite and asynchronous data sources. Core concepts include backpressure, precise signal semantics (onNext, onError, onComplete), and the dual function of the Flow.Processor create the basis for stable, extensible pipelines.
The patterns shown – from map and filter processors to batching and throttling – make it clear that robust processing chains can be built with little code. Supplemented by retry strategies, idempotency and interoperability with established reactive frameworks, a toolbox is created that is suitable for simple learning examples as well as for upscaled systems.
This makes it clear: If you work with event streams in Java, you can’t get around the Flow API. It bridges the gap between classic streams, modern reactive programming and future developments such as virtual threads. It is crucial to consistently adhere to the principles of backpressure, clean use of resources and clear signal guidance. This results in pipelines that are not only functional, but also durable, high-performance and safe.
Discover more from Sven Ruppert
Subscribe to get the latest posts sent to your email.
Thanks for the article, this is a great explanation. Just a quick note: the FlowAdapter is not part of the JDK itself, but rather part of the Reactive Streams library and it serves as a bridge to the JDK’s Flow APIs.