Category Archives: Concurrency

Rethinking Java Streams: Gatherer for more control and parallelism

Since version 8, Java has introduced an elegant, functional approach to processing data sets with the Streams API. The terminal operation collect(…) represents the bridge from the stream to a targeted aggregation – in the form of lists, maps, strings or more complex data structures. Until Java 20 the processing was done Collector-Instances were regulated, which internally consisted of a supplier, an accumulator, a combiner and optionally a finisher. This model works well for simple accumulations but has noticeable limitations, particularly for complex, stateful, or conditional aggregations.

Java 21 was the new interface java.util.stream. The gatherer was introduced, significantly expanding the semantics and control over the accumulation process. A Collector passively collects data, acts as a Gatherer, and actively responds to the incoming elements, which is comparable to a specialised transducer in functional programming languages. Gatherers are particularly useful where procedural or stateful aggregation is necessary, and they also allow element insertion, filtering, skipping, and explicit termination of the gathering process – all within the framework of a functional composable architecture.

The semantics of gatherers

A Gatherer<T, R> describes the transformation of one Stream<T> into a result of type R under close control over the accumulation process. In contrast to Collector, which is in a sense a container for aggregation logic, the gatherer allows rule-based, state-dependent processing of inputs – including the ability to skip elements (Drop), to insert additionally (Inject) or to end processing early (FinishEarly).

To make this possible, a gatherer is based on the idea of ​​a Sink, which is called in the context of the stream processor. This sink receives every input element, can react to it and thus actively influences the flow of processing. The actual processing is done via a so-called Adapter Factory which manages the transitions between the aggregation states.

Why Gatherers are more than just a “better collector”

While the conventional Collector serves primarily as a final accumulation tool – i.e. to transfer the elements contained in the stream into a target structure such as a list, a map or an aggregation – it works Gatherer conceptually far beyond this role. It is not just an optimised or syntactically varied form of Collectors, but rather an independent mechanism that opens up new expressions for stream processing, both semantically and functionally.

The central difference lies in the expanded scope for action during the transformation: This means explicitly that a Gatherer can accumulate not only elements but also new, previously non-existent elements that can be fed into the data stream. This opens up the possibility, for example, of introducing initialisation values ​​at the beginning of a stream or placing control characters such as headers and footers specifically at the beginning or end – without artificially expanding the original data stream.

This creative freedom becomes particularly clear when dealing with conditions. Where a Collector usually operated with a simple accumulator, the state of which leads to a final result, can be a Gatherer work based on state – and allow this state to be influenced across several elements. This opens up new semantic horizons: For example, window operations can be formulated in which temporal or sequential logic is applied – such as aggregating data up to an inevitable “end” marker, or combining groups of elements that can only be identified by a particular order or content structure.

Even complex decision structures, such as those required in multi-stage parsing processes or when traversing decision trees, can be achieved using stateful ones that Gatherer implements elegantly and declaratively. The interface remains in the spirit of functional programming: transformation and aggregation can still be described separately, but the Gatherer ensures their connection in a way that was previously only possible through imperative or difficult-to-maintain stream hacks.

Another advantage is the controlled influence of past elements on current behavior. This is how one can Gatherer For example, making the decision to discard an element because a previous element set a certain context. This context sensitivity capability is particularly relevant in situations where data streams are structurally “not clean” – such as log files, inconsistent data exports, or natural language analysis.

A Gatherer is not just “better Collector”, but a fundamental new tool for data stream-based modeling of complex transformation logic. It opens up a design space in which state, transformation, context and accumulation can interact in a way that was previously only possible with considerable effort – or outside of the stream model. Anyone who has ever worked with stateful Gathererconstructions will notice how much this expands the expressiveness of functional stream architectures.

A concrete example: grouping with filter logic

Let’s imagine that we want to collect from a stream of strings only those elements that have a particular property and then group them – for example, all words longer than 5 characters, grouped by their first letter. This requirement can be met with a Collector formulate, but requires a combination of preprocessing (e.g. filter(…)) and downstream grouping. With a Gatherer On the other hand, this combined process can be represented elegantly, comprehensively and in one step:

In this example, a decision is made for each element as to whether it will be included in the result. The logic is embedded directly into the Gatherer. The return value accurate signals that processing should continue. You would, at this point, instead of false return, and the stream would end prematurely – a behaviour that is not possible with conventional Collectors is not reachable like that.

Integration with Streams API

The interface Gatherer<T, A, R> explicitly distinguishes between more sequential and parelleler processing. The central distinction arises from the factory methods:

Gatherer.ofSequential(…) // Can only be used sequentially

Gatherer.ofConcurrent(…) // Suitable for parallel streams

A gatherer who comes with ofConcurrent(…) may be used in parallel streams, but must meet certain requirements: it must be thread-safe or rely on thread-isolated accumulators. This is similar to the logic of parallel collectors, where internal state management allows different elements to be processed simultaneously in independent threads.

Sequentieller Gatherer

Especially at sequential processing—i.e., if there is no parallelisation—the Gatherer develops its full expressiveness while remaining simple, type-safe, and deterministic.

The functionality of a sequential gatherer can be divided into three main phases: initialisation, accumulation and Emission. Each of these phases is described in detail below, with particular attention to the unique features of sequential processing.

Initialisation: The state of the gatherer

Each gatherer has an internal state that is recreated per stream execution. This condition is about one Supplier<S> defined, where S represents the type of condition. In sequential processing, this state is reserved exclusively for a single thread; therefore, no thread safety requirements exist. This means that simple objects like ArrayList, StringBuilder, counter arrays, and custom records can be used without problems.

A typical condition could e.g. B. look like this:

record ChunkState<T>(List<T> buffer, int chunkSize) {}

The associated supplier:

() -> new ChunkState<>(new ArrayList<>(chunkSize), chunkSize)

The state lives for the entire stream run and serves as context for all further processing steps. Initialisation lays the foundation for state-based logic, such as buffering, counting, aggregation or tracking previous elements.

Accumulation: Processing the input elements

The accumulation function is the heart of every gatherer. It is called for each element of the input stream. The signature of this function is:

(state, input, downstream) -> { … }

This is where the actual logic happens: The input element is brought into the state, and – depending on the current state – a decision can be made as to whether (and if necessary, how many) output values ​​are produced. The decision as to whether an item is passed downstream rests entirely with the gatherer.

Example: Every third element of a stream should be emitted.

In contrast to classic filter or map operations, this logic is conditional and imperative: The gatherer remembers how many elements have already been processed and only emits a result for every third. The accumulation logic is, therefore, comparable to that of the accept () Method of a specially defined consumer, but supplemented by downstream control.

Since there is no threading in sequential processing, all operations can be performed directly without synchronisation. The state can be arbitrarily complex and dynamic as long as it is updated correctly within the stream.

Emission: The control over the output

Elements are output via the Sink<R>-Object provided to the Gatherer upon each accumulation. With his method, push(R element) elements can be passed downstream in a targeted and controlled manner. Instead of map or flatMap, where each input leads to one or more outputs automatically transformed, the gatherer decides himself, if, at and was he emits.

For example, a gatherer can:

  • push individual output values,
  • push multiple values ​​at once (e.g. with chunking or tokenisation),
  • completely suppress the emission (e.g. under preconditions),
  • generate values ​​with a delay (e.g., at the stream’s end or after accumulation thresholds).

Example: Combining three consecutive elements into a string:

The emission here only occurs when three elements have been collected. These are merged, pushed and the state is then emptied.

An often overlooked but essential component of a sequential gatherer is the Finisher – i.e. the closing function after the last input element. This phase is crucial because often during regular accumulation Items retained in state which will only be done at a later date or even no longer through regular accumulation can be emitted. The finisher ensures that such remaining elements or aggregated partial results are not lost but are correctly transferred downstream.

Signature and purpose of the finisher

The closing function has the signature:

BiConsumer<State, Sink<R>>

she will after all input values ​​have been processed called by the stream framework – exactly once. In this function, the final state can be accessed and, based on this state, a decision can be made as to whether (additional) output values ​​should be created.

The finisher is particularly suitable for:

  • Partially filled buffers, for example in chunking operations when the last block does not reach full size,
  • Final aggregations, e.g. B. in averaging, summation, hash calculation or protocol completion,
  • Finalisation of state machines, e.g. B. if an open state still needs to be completed,
  • Cleaning or logging, e.g. B. statistical outputs or final indicators.

Concrete example: chunking with remainder

Let’s look again at the example of a gatherer that groups elements into groups of three. Without finishers, if the number of elements is odd, the last one or two values ​​would be lost:

In the closing function (finish), it is explicitly checked whether there are still elements in the state—i.e., whether the buffer is incomplete. These residual values ​​are then combined into an aggregate and pushed.

Without the finisher there would be the gatherer functionally incomplete: For input sets that are not a multiple of three, the last chunk would simply be discarded – a classic off-by-one error.

Interaction with accumulation

The finisher is semantically separated from the accumulation logic, but accesses the same state. This means that, depending on the specific application, it can use the same auxiliary functions or serialisation routines as the accumulation itself. In practice, it is advisable to define auxiliary methods for common logic such as “combine and empty the list” in order to avoid redundancy.

No return – but effect through push()

The finisher gives no return value back, but – like the accumulation function – works via what is provided Sink. So it doesn’t find any return semantics, instead a controlled completion of processing push() views.

The finisher of a sequential gatherer is the binding conclusion of the processing model. He guarantees that all information remaining in the state is processed and, if necessary, emitted. Especially in data stream-based applications where incomplete blocks, open ends, or residual states are typical, the finisher is essential to avoid data loss and ensure semantic correctness. Therefore, the finisher has a clean gatherer design that is not optional but rather an integral part of a well-defined stream processing step.

A sequential gatherer combines:

  • the state handling of an aggregator,
  • the control flow options of a parser,
  • the expressiveness of an imperative processor,
  • and the clarity of functional APIs.

By foregoing parallelisation logic and concurrency, the sequential variant allows a gatherer to be developed with minimal overhead and maximum expressiveness – a tool that combines both the flexibility of imperative logic and the composition of functional programming.

Parallel Gatherer

A parallel gatherer is for parallel Data processing pipelines are responsible for the four phases initialiser, integrator, combiner and finisher can be explicitly separated and controlled from each other.

Initializer – The creation of the accumulator

The method initialiser defines how a new accumulator (internal state) is created. This is the first step in processing each substream in sequential and parallel pipelines. 

The signature is also: Supplier<A> initializer();

In parallel processing, this initialiser is called several times – once per substream, i.e. per thread that takes over a split of the data. This ensures that no synchronisation within the accumulator is necessary: ​​each thread operates in isolation with its own state.

Integrator – The processing of elements

The integrator is a central function for inserting stream elements into the accumulator. It is one BiConsumer<A, T>, i.e. a function for each element T the Accumulator A changed accordingly.

The signature reads: BiConsumer<A, ? super T> integrator();

In parallel streams, this integrator is also applied to partial accumulators. What is important here is that this function may only change the accumulator locally and may not influence any global states.

Combiner – The combination of partial accumulators

The combiner’s task is to combine several independently processed accumulators into one overall accumulator. This phase is only relevant in parallel stream executions. The combiner receives two partial results—typically from two threads—and has to combine them into a common result.

The signature is: BinaryOperator<A> combiner();

The correct implementation of the combiner is essential for the correctness of the parallel execution. It must be associative. This is the only way the JVM can freely distribute the operation across multiple threads.

Finisher – The transformation of the accumulator into the final result

The finisherfunction transforms the accumulator A into the desired result R. While A is used internally to work efficiently during aggregation R the actual result of the entire operation – such as an immutable collection, a merged string, an optional, a report object, etc.

The signature reads: Function<A, R> finisher();

Unlike the integrator and combiner, the finisher becomes accurate once called, at the end of the entire processing chain. It therefore serves as a bridge between the internal aggregation mechanism and external representation.

Interaction in parallel gatherers

In a parallel stream with gatherer-based collector, the following happens:

  1. An initialiser is called for each partial stream (split) to create a local accumulator.
  2. The integrator processes all elements in the respective substream one after the other and modifies the local accumulator.
  3. The combiner phase is called as soon as two partial accumulators need to be combined. This happens either through ForkJoin merging or in the final reduction step.
  4. After all partial accumulators have been combined, the finisher is called to calculate the final result.

This explicit separation and the ability to control each phase make Gatherer a powerful tool for complex, stateful, or stateless aggregations—especially when performance through parallelisation is crucial or custom aggregation logic is required.

An example implementation  

Let’s first define the gatherer in general and put it in a stream.

Now we define the respective subcomponents:

Initialiser:

var initializer = (Supplier<ConcurrentMap<Integer, List<Integer>>>) ConcurrentHashMap::new;

Integrator:

The integrator is responsible for processing individual stream elements (here: Integer) and their insertion into a shared state (ConcurrentMap). The element is sorted according to a specific grouping criterion: all elements that are in the same block of 100 (e.g. 0–99, 100–199, …), are entered in the same list.

There is a special feature in this implementation:

if (element > END_INCLUSIVE) return false;

This condition serves as Abort signal: as soon as an element is above a specified limit (END_INCLUSIVE), processing is completed by returning false canceled. This is a special feature of the Gatherer-Model: The return value false signals that no further elements should be processed – a type early termination.

This line does the actual grouping: If under the key blockStart, if no list already exists, a new, synchronised one will be created. ArrayList created.

The current item is then added to this list.

By using Collections.synchronizedList(…) becomes even within a parallel gatherer context ensures that list accesses are thread-safe – even though the ConcurrentMap itself is only responsible for map access, not for the values ​​it contains.

The integrator therefore defines the following processing semantics:

  • Elements are grouped by blocks of 100 (0–99, 100–199, etc.).
  • The assignment is done via a ConcurrentMap, where each block contains a list.
  • The lists themselves are synchronised to allow concurrency within the list operations.
  • By returning false can processing ended early become – e.g. B. when a limit value is reached.

Combiner:

First, a new empty, thread-safe map is prepared to contain all entries state1 and state2 should be inserted. This new map is deliberately fresh because neither state1 still state2 should be changed – this protects against unwanted side effects and makes the function work referentially transparent.

var mergedMap = new ConcurrentHashMap<Integer, List<Integer>>();

entries state1 insert

This method computeIfAbsent checks whether in the target map mergedMap already an entry for the key k exists. If this is not the case, the lambda is used k1 -> new ArrayList<>() a new entry is created and inserted. The method guarantees that an existing, modifiable list is returned afterwards – regardless of whether it was just created or already existed.

The method addAll(…) hangs all elements of the list v out of state1 to the corresponding list in mergedMap to. This expands the aggregate state for this key in the target map.

entries state2 insert

The same process is then repeated for state2 repeated:

Every entry is made here state2 in the mergedMap transmitted. If the key does not yet exist, the value (value, one List<Integer>) taken directly. If the key is already in mergedMap, it exists by merge(…) using a custom merge strategy: the list contents of both maps are merged v1.addAll(v2) combined. What is important here is that v1 is the existing list, and v2 is the newly added one.

In the end, the newly created, combined map is returned—it represents the complete aggregate state, which contains the contents of both partial states.

Finisher:

This implementation of the finisher is minimalistic but functionally correct: it takes the final state (state) of the accumulator – one ConcurrentMap<Integer, List<Integer>> – and hands it directly to him downstream, i.e. the next stage of the processing chain in the stream.

The attribute state is the aggregated result of the previous steps (initialiser, integrator, combiner). In this case it is a map that maps blocks of 100 (Integers) to a list of the associated values ​​(List<Integer>).

The attribute downstream is a push receiver that consumes the end result. It abstracts the next processing stage, such as a downstream map, flatMap, or terminal collection process.

The method push(…) of the downstream object explicitly forwards the finished result to the next processing stage. This is fundamentally different from conventional collector Concepts, where the end result is simply returned.

This type of handover makes it possible, in particular, in one within Gatherer defined, stateful context to deliver multiple or even conditional results – for example:

  • Streaming of intermediate results (e.g. after a specific batch)
  • Quitting early after the first valid result
  • Multiple edition during partitioning

In this specific case, however, precisely one result was passed on—the fully aggregated map. This classic “push-forward finisher” determines the condition as a result emitted.

We have now examined the Gatherer in detail and pointed out the differences in sequential and parallel processing. So, everything should be together for your first experiments.

Happy Coding

Sven

The History of Parallel Processing in Java: From Threads to Virtual Threads

Since the early days of computer science, parallel processing has represented one of the greatest challenges and opportunities. Since its inception in 1995, Java has undergone a significant journey in the world of parallel programming to provide developers with ever-better tools. This story is a fascinating journey through threads, the executor framework, fork/join, parallel streams, CompletableFuture and the latest developments in Project Loom. In this blog post, we take a detailed look at the evolution of parallel processing in Java and the innovations that came with it.

Locking mechanisms in thread programming in Java

To make parallel processing safe and efficient, Java provides various locking mechanisms that help coordinate access to shared resources and avoid data corruption. These mechanisms are critical to ensure data consistency and integrity, especially when multiple threads access the same resources simultaneously. The most critical locking mechanisms in Java are described below:

`synchronized` keyword

The `synchronized` keyword is the most basic mechanism for synchronising threads in Java. It ensures that only one thread can access a synchronised method or code block at a time. This mechanism uses a so-called monitor, which is automatically locked and unlocked.

Advantages:

– Easy to use and integrate into code.

– Automatic management of bans.

Disadvantages:

– Can lead to performance issues if many threads want to access the synchronised resource simultaneously.

– No flexibility, such as B. the ability to set timeouts or conditional locks.

`ReentrantLock`

`ReentrantLock` is a class from the `java.util.concurrent.locks` package that provides greater flexibility than the `synchronized` keyword. As the name suggests, `ReentrantLock` is a “reentrant” lock, meaning that a thread that already holds the lock can reacquire it without getting into a deadlock.

Advantages:

– Provides more control over the locking process, e.g. B. the ability to acquire locks with timeouts (`tryLock`).

– Supports fair locking, which ensures that threads are granted access in the requested order (`lock(true)`).

Disadvantages:

– Requires manual release of locks, increasing the risk of deadlocks if `unlock` is not performed correctly.

`ReadWriteLock`

`ReadWriteLock` is a unique locking mechanism that distinguishes read and write locks. It consists of two locks: a read lock and a write lock. Multiple threads can acquire a read lock simultaneously as long as no write operation is performed. However, only one thread can acquire the write lock, keeping writes exclusive.

Advantages:

– Increases concurrency as multiple threads can read simultaneously as long as there are no writes.

– Reduces the likelihood of blocking read access.

Disadvantages:

– More complex to use than `synchronized` or `ReentrantLock`.

– Requires careful design to ensure no deadlocks or race conditions occur.

`StampedLock`

`StampedLock` is another variant of `ReadWriteLock` introduced in Java 8. Unlike `ReadWriteLock`, `StampedLock` provides optimistic read locks that enable even higher concurrency. A stamp is returned each time the lock is acquired to ensure the data remains valid.

Advantages:

– Provides optimistic read locks that enable high concurrency as long as no writes occur.

– Lower read operation overhead compared to traditional locks.

Disadvantages:

– Complex to use as developers need to ensure that the stamp is valid.

– No “re-enterable” locks, which may limit usage for some scenarios.

`Semaphore`

`Semaphore` is another synchronisation mechanism that allows several threads to access a resource simultaneously. It is often used to control simultaneous access to limited resources.

Advantages:

– Allows you to limit the number of threads that are allowed to access a resource at the same time.

– Flexible and applicable to various scenarios, e.g. B. to implement pooling mechanisms.

Disadvantages:

– Can become complex when using multiple semaphores in an application.

– Requires careful management to ensure that `acquire` and `release` are called correctly.

Summary of the locking mechanisms

Java offers a variety of locking mechanisms, each suitable for different scenarios. The `synchronized` keyword is easy to use, but less flexible for complex scenarios. `ReentrantLock` and `ReadWriteLock` provide more control and enable higher parallelism, while `StampedLock` is suitable for particularly demanding read operations. `Semaphore`, on the other hand, is ideal for controlling concurrent access to limited resources. Choosing the proper mechanism depends on the application’s requirements, particularly regarding concurrency, resource contention, and maintainability.

The early years: Threads and `Runnable`

Initially, Java’s concept of threads was the cornerstone of parallel processing. Java was developed as a platform-independent language intended for networked, distributed systems. With a focus on multithreading, Java offered a built-in `Thread` class and the `Runnable` interface, making it possible to execute multiple tasks simultaneously.

In the early versions of Java, threads were the only option for parallel processing. These were supported by operating system threads, which meant managing threads could be heavy and resource-intensive. Developers had to take care of details such as synchronizing shared resources and avoiding deadlocks themselves, which made developing parallel applications challenging.

Using threads: advantages and disadvantages

Advantages of threads

Simple modelling of parallel tasks: Threads allow developers to divide parallel tasks into separate units that can run concurrently.

Direct control: Threads give developers fine control over parallel execution, which is particularly useful when specific thread management requirements exist.

Good support from the operating system: Threads are supported directly by the operating system, meaning they can access all system resources and benefit from operating system optimisations.

Disadvantages of threads

Complexity of synchronisation: When multiple threads access shared resources, developers must use synchronisation mechanisms such as `synchronized` blocks or `locks` to avoid data corruption. Such mechanisms ensure that only one thread can access a critical resource at a time, ensuring data consistency. However, this often results in complicated and error-prone code, as developers must carefully ensure that all necessary sections are correctly synchronised. An incorrectly set or forgotten synchronisation block can lead to severe errors, such as race conditions, in which the program’s output depends on the timing of thread executions. Additionally, synchronisation mechanisms such as `locks` or `synchronized` blocks can lead to a performance penalty as threads often have to wait until a resource is released, which limits parallelism. These wait times can cause bottlenecks in more complex applications, mainly when multiple threads compete for different resources. Therefore, correctly applying synchronisation techniques requires a deep understanding of thread interactions and careful design to ensure data integrity and maximise performance.

Resource intensive: Each thread requires space for its stack and additional resources such as thread handling and context switching. These resource requirements can quickly add up with many threads and lead to system overload. In particular, the memory consumption for the thread stacks and the management of the threads by the operating system lead to increased resource requirements. With a large number of threads, the frequency of context switches also increases, which can lead to a significant reduction in overall performance. This often makes threads inefficient and difficult to manage at a large scale.

The danger of deadlocks: When using synchronisation mechanisms, there is a risk of deadlocks when two or more threads are blocked in a cyclic dependency on each other. Deadlocks often arise when multiple threads hold different locks in a mismatched order, waiting for other threads to release the needed resources. This leads to a situation where none of the threads can continue working because they are all waiting for the others to release resources. Deadlocks are often difficult to reproduce and debug because they only occur under certain runtime conditions. Strategies to avoid deadlocks include using timeout mechanisms, avoiding nested locks, and implementing lock ordering schemes to ensure all threads acquire locks in the same order.

Difficult scalability: Manual management of threads makes it difficult to scale applications, especially on systems with many processor cores. One of the main reasons for this is the challenge of determining the optimal number of threads to use system resources efficiently. Too many threads can cause system overload because management and context switching between threads consume significant CPU resources. On the other hand, too few threads can result in under-utilization of available processor resources, degrading overall performance. Additionally, it is challenging to adapt thread management to the dynamic needs of an application, especially when the load is variable. Developers often have to resort to complex heuristics or dynamic thread pools to control the number of active threads, which significantly complicates the implementation and maintenance of the application. These challenges make it complicated to efficiently scale applications to modern multicore processors because the balance between parallelism and overhead is challenging.

Java 5: The Executor Framework and `java.util.concurrent`

The introduction of Java 5 in 2004 introduced the `java.util.concurrent` package, intended to address many of the problems of early parallel programming in Java. The Executor framework enabled higher abstraction of threads. Instead of starting and managing threads manually, developers could now rely on a task-based architecture to pass tasks to an `ExecutorService`.

The Executor framework introduced classes like `ThreadPoolExecutor`, `ScheduledExecutorService`, and many synchronization helper classes like `Semaphore`, `CountDownLatch`, and `ConcurrentHashMap`. This not only made thread management easier but also led to more efficient use of system resources.

The Executor Framework changed the way developers modelled parallel tasks. Instead of focusing on thread creation, they could define tasks and let the infrastructure handle execution.

Java 7: Fork/Join Framework

Java 7 introduced the Fork/Join framework in 2011, explicitly designed for computationally intensive tasks that could be broken down into smaller subtasks. The fork/join framework provided a powerful recursion and divide-and-conquer infrastructure, allowing complex problems to be broken down into smaller, more manageable sub-problems. This division enabled the efficient use of modern multi-core processors.

The fork/join framework was particularly useful for problems broken down into independent subproblems, such as B. calculating Fibonacci numbers, sorting large arrays (e.g. with merge sort) or processing large amounts of data in parallel. The central component of the framework is the `ForkJoinPool`, which handles the management of task transfer between threads. The `ForkJoinPool` uses the so-called work stealing process, in which less busy threads take over work from more busy threads. This ensures better load balancing and increases the efficiency of parallel processing.

Another advantage of the fork/join framework is the ability to handle recursive tasks efficiently. Developers can use classes like `RecursiveTask` or `RecursiveAction` to define tasks that either provide a return value (`RecursiveTask`) or do not require a return (`RecursiveAction`). The fork/join approach makes it possible to recursively split the tasks (`fork`) and then combine the results again (`join`).

The fork/join framework delivered significant performance improvements for CPU-intensive workloads, particularly for tasks easily broken down into smaller, independent pieces. It made it easier to write parallel algorithms without the developer worrying about thread distribution details. The ‘ForkJoinPool’ handles the distribution of tasks and uses work stealing to ensure that the processor resources are used optimally. This significantly increases performance compared to manual thread management, especially for computationally intensive and highly parallelisable tasks.

Java 8: Parallel Streams

Java 8, released in 2014, marked a milestone in the evolution of Java, particularly with the introduction of lambda expressions, streams, and functional programming. These new features made the language more flexible and easier to use, especially for parallel operations. One of the most significant new features for parallel processing was Parallel Streams.

Parallel Streams allowed developers to effortlessly parallelise operations across collections without explicitly dealing with threads or synchronisation. This was achieved by integrating the fork/join framework behind the scenes. Parallel streams use the `ForkJoinPool` internally to distribute tasks efficiently across multiple processor cores. This approach is based on the divide-and-conquer design principle, in which an enormous task is broken down into smaller subtasks that can be executed in parallel.

The developer uses the `parallelStream()` method to convert a collection into a parallel stream. This results in the processing of the collection’s elements occurring simultaneously, with individual parts of the task being automatically distributed across the available CPU cores. In contrast to manual thread management, this approach offers a high level of abstraction and relieves the developer of the complex management of threads and synchronisation.

An example of using Parallel Streams is processing a list of numbers in parallel. Here, the operations applied to each element are carried out simultaneously, which can achieve a significant increase in performance on large data sets:

The design of Parallel Streams aims to simplify the development of parallel applications by using a declarative syntax that allows the developer to focus on the logic of data processing rather than on the details of thread management and distribution of the tasks to deal with. This higher abstraction makes parallel processing more accessible, which leads to better performance, especially in multi-core systems.

Java 8: CompletableFuture

The `CompletableFuture` API was also introduced in Java 8 and significantly expanded the possibilities of asynchronous programming. `CompletableFuture` allows the creation, chaining and combining asynchronous tasks, making it a handy tool for developing event-driven applications. Using methods like `thenApply`, `thenAccept` and `thenCombine` makes it easy to define a sequence of asynchronous operations that should be executed sequentially or whose results can be combined.

A `CompletableFuture` represents a future calculation and allows the different steps of the workflow to be defined declaratively. For example, an asynchronous calculation can be started, and the result can be processed further without worrying about explicitly managing the threads. This significantly simplifies the code and makes it more readable.

An example of using `CompletableFuture` shows how to execute multiple asynchronous operations one after the other:

In this example, an asynchronous calculation is first started that returns the string “Hello”. Then, the result of this calculation is modified by the `thenApply` method by adding ” World”. Finally, `thenAccept` prints the result on the console. The entire process occurs asynchronously, without the developer worrying about explicitly managing threads.

The architecture of `CompletableFuture` is based on the concept of so-called “completion stages”, which allow the creation of asynchronous pipelines. Each `Completion Stage` can either trigger a new calculation or process the result of a previous calculation. This enables the modelling of complex workflows, such as: E.g. executing multiple tasks in parallel and then combining the results (`thenCombine`), or defining actions that should be carried out in the event of an error (`exceptionally`).

Another significant advantage of `CompletableFuture` is the ability to combine asynchronous tasks. For example, two independent asynchronous calculations can be performed in parallel, and their results can then be merged:

In this example, two calculations are performed in parallel, and their results are combined. The `thenCombine` method allows the results of the two futures to be added, and `thenAccept` prints the combined result.

This model allows complex workflows to be created without explicitly relying on threads or callbacks, making the code cleaner, more modular, and easier to maintain. `CompletableFuture` also provides methods such as `allOf()` and `anyOf()` that allow multiple futures to be monitored and processed simultaneously. This is particularly useful for scenarios where numerous independent tasks must be executed in parallel.

Overall, the `CompletableFuture` API makes asynchronous programming in Java much more accessible and allows developers to develop reactive and non-blocking applications with relatively little effort.

Java 9 to Java 19: Improvements and Project Loom

After Java 8, the parallel programming models continuously improved. Java 9 brought improvements to the fork/join framework and introduced the ‘Flow’ API, which supported a reactive streaming model. Java 9 to 17 focused primarily on performance improvements, security, and the modularization of the JDK (Project Jigsaw).

However, one of the most significant innovations in recent times is Project Loom. Since Java 19, `Virtual Threads` has been available as a preview feature to revolutionise parallel programming in Java. Virtual threads are lightweight threads that enable many concurrent tasks to run without the typical overhead of traditional operating system threads. While traditional threads are limited by resource costs (such as memory for stacks and context switches), virtual threads are designed to be much more resource-efficient. This means developers can create millions of virtual threads that work independently without overloading the system.

Virtual threads are handy for server-side applications that handle many concurrent connections, such as web servers or microservices. In traditional approaches, handling each request in its thread often leads to scaling problems because hardware resources limit the number of possible threads. Virtual threads, on the other hand, allow each incoming request to be handled in its virtual thread, significantly increasing parallelism.

Virtual threads work because they are efficiently managed by the Java runtime system rather than directly by the operating system like traditional threads. This significantly speeds up context switching and makes managing millions of threads realistic. Virtual threads are programmed like traditional threads, allowing existing code to be easily adapted to take advantage of this new technology.

A simple example of using virtual threads shows how to use an `executor` to execute a task in a virtual thread:

This example creates an executor that uses a new virtual thread for each task. The `submit` method starts the task in a virtual thread, which requires significantly fewer resources than a traditional thread.

Project Loom can potentially make parallel programming in Java much more accessible by eliminating the need for developers to worry about thread scaling. Virtual threads are significantly more efficient and offer much higher parallelism without the programmer having to work with thread pools or complex synchronisation mechanisms explicitly. This increased scalability is particularly valuable in applications where concurrent operations must be dynamically scaled, as in many modern web applications and cloud environments. The introduction of virtual threads makes Java an even stronger choice for developing highly scalable, parallel applications by dramatically reducing the complexity of thread management.

The Evolution of Parallel Processing in Java: Conclusion

The journey of parallel processing in Java reflects the language’s evolutionary nature. From the early days of threads, when developers had to rely on low-level APIs, to the highly abstract paradigms such as the Executor framework, Fork/Join, and Parallel Streams, Java has continually introduced improvements to make parallel application development easier.

With recent developments such as ‘CompletableFuture’ and Project Loom, Java can meet the needs of modern software development, especially in scalability and performance. Parallel processing in Java is now simpler, safer and more efficient than ever before – providing developers with powerful tools to exploit the full potential of modern multicore systems.

Looking into the future

With Project Loom on the path to stability, we could see a further shift in focus towards even simpler and more performant parallel processing techniques. Virtual threads will likely pave the way for new frameworks and libraries that benefit from this lightweight parallelism. Developers will continue to have access to the best parallel processing tools without worrying about thread management’s complexities.

Java has proven that it can keep changing with the times as a parallel programming language—a language that rises to the challenges and adapts to the needs of modern developers. Java’s history of parallel processing is one of progress and continuous innovation. With the upcoming developments, it will remain exciting to see what new possibilities the future will bring us.