A Java™ Parallel Calamity
Proposed for the Java™ SE 8 release is support for parallel bulk operations (filter/map/reduce.) This is great news for application developers who have wanted parallel operations to compete with C# in this area. Unfortunately, the parallel engine that supports this feature is fatally flawed. (3300 words)
Edward Harned (eh at coopsoft dot com)
Senior Developer, Cooperative Software Systems, Inc.
January, 2011 - 2013 [updated October, 2013]
This article is part two of a two part series on parallel computing in Java™. Part one deals with the problems of an academic centric Fork/Join framework in a commercial development arena. This article deals with the devastating effects the Fork/Join framework will have on forthcoming parallel Bulk Data Operations for Collections (JEP 107) in Java™ SE 8.
A little parallel background …
Parallel processing generally factors into two main branches:
- Massively Parallel Processing (MPP) on clusters of computers and
- Symmetric Multiprocessing (SMP [often called shared memory processors].)
Fork/Join — splitting the work into fragments and joining the results together — is one aspect of parallel processing on SMP computers. Fork/Join mostly divides into two main areas:
- Separating the problem into components, placing each component into a queue for processing by a pool of threads, and joining the results. Component processing is a common procedure in the commercial application development community.
- Dividing a problem into two or more sub-problems, solving the sub-problems in parallel, and joining the results. Dynamic decomposition (frequently called divide-and-conquer) is a common procedure in the academic/scientific community.
Recursive decomposition is a derivative of dynamic decomposition. The Fork/Join Framework in Java™ SE 7 uses recursive decomposition optimized for dyadic recursive division.
- The framework divides a problem into two sub-problems, forks and joins the results
- Each sub-problem divides the sub-problem into two sub-problems, forks and joins the results
- Until it reaches a threshold where it solves each sub-problem in parallel returning a result up the chain.
(256) (256) (256) (256)
(128) (128) (128) (128) (128) (128) (128) (128)
Dynamic decomposition has a very narrow performance window:
- It needs a massive volume of easily separable data (aggregate). If you need to sum an array of one million integers, then it is beneficial to decompose the work with Fork/Join.
- It needs a low volume of concurrent requests. If you have N processors and 8(N) concurrent requests, then using one thread per request is often more efficient for throughput. The logic here is simple. If you have N processors available and you split your work accordingly but there are hundreds of other tasks ahead of you, then what's the point of splitting? Try it yourself. (You can download the source code for a F/J vs. Thread Pool demo below.)
Recursive decomposition has an even narrower performance window. In addition to the above dynamic decomposition, recursive decomposition optimized for dyadic recursive division only works well:
- on balanced tree structures (Directed Acyclic Graphs)
- where there are no cyclic dependencies
- where the computation duration is neither too short nor too long
- where there is no blocking
Parallel Operations in Java™ SE 8
The JDK1.8 engineers are using the recursively decomposing F/J framework, with its very, very narrow measure, as the engine for parallel programming support simply because they don’t have anything else. They also maintain that since the F/J framework is already inside the JDK, they use what they have.
Using the F/J framework for niche processing in JDK1.7 was bad enough. (A Java™ Fork-Join Calamity) Using this framework as the parallel engine for all Java™ parallel computing is a calamity in the making.
The Java™ SE 7 F/J framework is the academic experiment cited in the 2000 research paper. Its design is to follow prior academic research and adhere to the principles of academia. It is a great tool for teaching students how to walk down the leaves of a balanced tree. It is not a general-purpose, commercial development tool. The four biggest problems with this framework as a parallel engine are that
- it uses submission queues for new requests
- it uses an intermediate join() method as the return point for forking methods.
- its mind-boggling complexity caused by the other two. Part one has an extensive write up on complexity and design failure going forward to JDK1.8.
- And for the coup de grace — Java™ EE cannot use any of the parallelization for bulk operations. None. Nada. Zilch. Zippo.
The framework puts the initial task into a submission queue. Since the framework is incapable of placing the task into the deque of an underutilized thread, all threads have to wake up and blindly go looking for work somewhere.
Submission queues only exist because the work stealing deques do not permit non-owner task insertion. Only the owner work thread puts tasks into and takes tasks from the bottom of the deque so there is no need for locking by the owner thread. That is a good property only if the forking thread processes the majority of the new tasks itself.
See: A Java™ Fork/Join Framework, section 4.5 Task Locality, “[the framework] is optimized for the case where worker threads locally consume the vast majority of the tasks they create. … As seen in the figure, in most programs, the relative number of stolen tasks is at most a few percent.”
But it makes other work threads continuously scan for work and it means the load is often unbalanced -
- threads can waste a significant amount of computing time just looking for work and
- thread starvation is a real problem when the computation is fast since the spawning thread eats its own sub-tasks leaving nothing for others.
The performance benefit of this work stealing scheme is primarily for the restricted class of nested-parallel (or fully strict) computations. That is, processing a balanced tree structure (DAG) using dyadic recursive division. Commercial application development is not just about halving balanced trees.
Why is this a problem?
The complexity required to make submission queues function borders on unsoundness. There hasn’t been an uglier pile of spaghetti code since the 1960’s. It may seem trivial reading this description here, but the coordination required between threads in a lock-free environment searching multiple queues is anything but inconsequential. The proof is in the code. The current version for JDK1.8 is in the currency interest list for jsr166e.
Worker threads must wake up and go looking for work.
- When to wake up,
- where to look (other worker’s deques or submission queues)
- how many times to scan for work,
- when to wait.
All that scanning cuts into the time the work threads could be computing. Just to gain a little speed when processing a balanced tree structure does not justify such a paradigm. A simple FIFO queue is simple and works as well for balanced tree structures and better for general-purpose, commercial applications.
The framework has severe performance problems on first usage since it must warm up before being effective. From the architect: "... beware that the very first parallel task execution time can be terrible. Really terrible."
The performance problem is simply the result of putting the initial request into a submission queue and making threads wake up and go blindly scanning for work. An appalling attempt at resolving this enigma is the practice of having submitting threads masquerade as worker threads (below) until the real workers become active, at a terrible risk. Submitting work to threads directly is a fast and simple alternative. After all, direct submission of new processes to CPUs is what operating systems do.
Compounding the submission queue quandary, is the practice of putting forked tasks back into the forking thread’s deque and making other threads go blindly scanning for work.
The start up/scan/fetch performance degradation of a few microseconds or even a millisecond on a personal computer is insignificant. The real problem comes when running this framework on a multi-use server for parallel bulk operations where there are many requests per second.
- When using the common ForkJoinPool, even though the work threads must finish one request before starting another, the new requests keep work threads in scan/fetch mode constantly.
- When using separate ForkJoinPool instances, the scan/fetch CPU overhead degrades other application’s performance by needlessly occupying CPUs. Simply known in the developer world as “not playing nice with others.”
Submitters as workers
An additional dilemma with submission queues is that the framework sometimes treats submitting threads as Fork-Join Worker Threads. That is, the submitting thread executes the tasks it just submitted to the framework. Instead of the framework putting the task into a submission queue, waking up worker threads to process the new task and issuing a wait() in the submitting thread, the submitting thread actually executes the work as part of the Fork/Join structure.
Why is this a problem?
- The submitting thread's UncaughtExceptionHandler is in effect which will adversely affect exception handling should an Error/Exception occur in the framework code.
- The submitting thread’s stack is contaminated with work that should be independent of it, which may adversely affect subsequent processing for applications or error recovery routines that do stack-walking.
- If the submitting thread started a transaction and a task executing in the framework starts another transaction, the transaction manager may think this a case of nested transactions and kill both transactions, or worse.
- The same scenario can apply for database management systems or other external resources, especially JNI.
- An unpredictable situation faces languages that use the JVM (Clojure, Groovy, Scala, etc.)
- This practice violates a fundamental principle of good programming in not separating a caller from the external processing.
Submission queues are a poor choice for general-purpose, commercial application development and even less so for a parallel engine.
Recursive decomposition using dyadic recursive division keeps splitting the the problem in two, forking and joining. The only place to hold the intermediate result of the forked task is in the joining task.
rightAnswer = right.compute();
leftAnswer = left.join();
return leftAnswer + rightAnswer;
Using the simple example of dyadic recursive division above, each of the tasks (1024, 512, 256), requires a join() which results in seven joins.
Consequently, the use of an intermediate join() requires a context switch to free the thread for other work while the joining task waits for another task's completion. Without a context switch, the program will eventually stall. The framework does not do, and cannot do, a context switch. Part one has an extensive write up on the lack of a context switch with method join().
Why is this a problem?
Just like the submission queue problem above, the complexity for supporting join() also borders on unsoundness.
Employing this framework for a stream of operations and many concurrent users may result in huge stack usages for work threads, work threads stalling and requests backing up due to the lack of context switching when using join(). A hypothetical multiple operation such as …
int sumOfWeights =
blocks.parallelStream().filter(b -> b.getColor() == RED)
.map(b -> b.getWeight())
… can result in multiple calls to the F/J framework. Now add dozens of concurrent users to the mix and the framework is flooded with requests and tasks needing context switches.
When pipeline methods like filter() or map() require an outside resource (database, communication, messaging) and a task fails, back out may fail as well since the framework contaminates the stack with many task's method calls when “continuation fetching.”
When a work thread exhausts its own deque of tasks it cannot steal tasks from other deques nor fetch new tasks from the submission queue until it finishes the current request completely. Doing so would further contaminate the stack and in the event of any Error/Exception, there would be no way to associate the problem with a particular request. Therefore, the work thread must issue a wait(), stalling the work thread.
Although the stall is not a fatal halt, the threads stall long enough that they delay a request’s completion. The proof is in the profiler. (You can download the source code for a demo that uses recursion to simulate a pipeline, MultiRecurSubmit.java below.)
The stalling thread problem is very similar to the degradation of submission queues, above. However, here it is more damaging. Instead of a few micro/milli-seconds delay, the thread stops working completely.
The temptation to fix the problem is overwhelming for the architect, but the problem cannot be fixed. Every patch, tweak, goose, fudge, dodge, hedge, hack, and massage used to emulate a context switch is only playing whack-a-mole and delaying the ultimate downfall. The only course of action for a task needing to wait is a context switch to free the thread. The architect's attempts using “compensation threads” and “continuation threads” and “continuation fetching” are a failure.
What is currently being done?
The latest tweak to lessen the use of join() is a Class called CountedCompleter. This is an attempt at scatter-gather [dynamic decomposing] logic rather than fork-and-join. In a CountedCompleter the first thread forks all tasks up front and uses a callback method to gather the results. Unfortunately, it uses the same recursive decomposing structure not a dynamic decomposing architecture and it suffers accordingly:
- The first thread dumps all forked tasks into its own deque since deques do not permit non-owner task insertion (above) making this a dump-and-gather technique. Other threads have to continuously fight each other at the top of the deque over each of the dumped tasks completely negating the usefulness of deques.
- The use of this Class for parallel streams results in stack overflows and stalls similar to that found with “continuation fetching.”
- Threads either have to recursively walk down the tree resulting in O(n) stack space or
- they revert to sequential processing (paraquential) of one branch at a time which results in task stalls. (See Nodes::collect — where invoke() must wait for subsequent tasks to finish).
- Paraquential processing makes simple summation such as
Arrays.stream().parallel().map(e -> e * 5 ).sum(); run about ten times slower than a simple two thread split. And no matter how many processors are available, it cannot scale. The proof is in the program. You can download the source code for a demo that compares sequential/parallel summation, IntArraySum.java, below.
- The use of this class for parallel Stream concatenation results in each operation executing sequentially. The consequence of which is that F/J limits parallelism to the stream of streams and does not apply parallelism to the streams themselves. For instance if you have 32 cores and are doing
Stream.of(s1, s2, s3, s4).flatMap(x -> x).reduce(...)
then at most you will only use 4 cores. (Paul Sandoz, Oracle®)
- The use of this class for parallel operations results in Out Of Memory Errors for those operations that do not process pure balanced tree structures.
- Any parallel operation that splits into many more than two new tasks unbalances the tree (such as: java.util.stream.LongStream.iterate()) which can result in huge task and splitting overhead.
- Any parallel operation that needs to buffer the results (such as: java.util.stream.LongStream.limit(n)) can require massive memory until n is reached.
- Both these problems are the direct results of not being able to scatter-gather properly and requiring completed tasks to remain alive, ineligible for garbage collection (forking tasks require a live reference to retrieve the results from forked tasks.) You can download the source code for an OOME demo that uses iterate and limit, OOM.java below.
- The latest whack-a-mole for this problem is to once again switch to paraquential processing. See Nodes::flattenInt — where invoke() must wait for subsequent tasks to finish. (new ToArrayTask.OfInt(node, array, 0).invoke();) When you run method firstNPrimes() of the OOM.java program the parallel version now takes almost twice as long to complete as the sequential version. Method distinct() takes almost three times as long to complete as the sequential version and may still result in OOME for some 32bit systems.
- Threads must still deplete their deques completely before helping other threads or starting the next request which results in thread stalls.
- There is still no way to wait for completion of intermediate results without a join(). And using join() can result in the “continuation threads” problem again. The proof is in the program. (You can download the source code for a demo that uses CountedCompleter with a join(), MultiRecurCountedSubmit.java below.)
- There is still no way to wait for completion of outside resources when using a ForkJoinPool.managedBlock(). And using managedBlock() results in a huge number of “compensation threads.” (You can download the source code for a demo that uses CountedCompleter with a managedBlock(), MultiRecurCountedManagedSubmit.java below.)
- The additional complexity is overwhelming. Emulating a dynamic decomposing structure from inside a recursive decomposing entity is turning the framework into a Rube Goldberg machine:
- More spaghetti code since the Class requires a non-public hook into modified ForkJoinTask exception handling mechanics
- The ForkJoinPool requires class-specific code (instanceof CountedCompleter checking) for the Class to function at all
- Using this Class requires a huge effort by application developers because the Class is “less intuitive to program.” (political speak for “it is a nightmare to program”) The brunt of the work usually handled by a decomposing structure implicitly must now be performed by application programmers explicitly — increasing the risk for mistakes.
- Some might call this putting lipstick on a pig.
Why is everything to date a failure?
- Join() outside of a controlled environment is a failure.
- “Continuation threads” for masking the join() problem is a failure.
- “Continuation fetching” to alleviate the extra threads problem is a failure.
- CountedCompleter to emulate scatter-gather is a failure.
- Threads lack a central management structure so there is constant create/destroy overhead and reversion to sequential processing (paraquential) to alleviate the excessive stack/heap usage.
- Paraquential processing locks thread resources to one task, takes longer to complete than a pure sequential computation, and cannot scale.
- Tasks hold their own results so threads cannot act independently on each task. Instead of finishing with a task, threads need to traverse through task after task (the regression technique, not the parallel procedure.)
Every successful multi-threading venture since the 1960’s has used a container for the threads. That is, first and foremost, control the threads then build the application on top of it. Operating systems control the processes. It is up to the application process to control its own threads.
You don’t build an automobile and then try to design an engine to go inside. You build an engine and then you can design any style of car to go on top.
Glassfish, JBoss, Tomcat, Cilk, Microsoft’s TPL applications, and countless other major applications all use the thread container approach.
This framework is the academic experiment created for a research paper. It is not, and never will be, a general-purpose thread container. Its primary use is for recursive decomposition and even there, without a join(), it is a failure.
The JDK1.8 engineers keep trying recursion to do stream processing but recursion has no role in streams.
What is the answer for success?
A general purpose container that manages the threads and requests.
Keep track of each request individually in a separate Object to free completed tasks for garbage collection. Scatter the stream tasks to every processor in the session. Let those tasks scatter more tasks as necessary. Gather the results in the request Object. If more processing is necessary (stream of streams) than repeat until the final result is ready.
Event-driven processing (1) is similar to parallel processing (2):
- handle the event and be gone, ready for the next event.
Simple. Fast. Efficient. Scalable. No call stack overflows. Less overall memory utilization.
Join(), or any other wait() without a context switch is a failure. Plain and simple.
The current consensus among Java™ EE engineers is that parallelization of bulk operations will revert to sequential processing in the EE container. Since this framework encapsulates the caller/service/application code there is no way to export the multi-threading to a separate JVM. Having an embedded service that:
- creates copious threads without regard for others,
- has a high potential for stack overflows,
- has a high potential for massive memory usage,
- has a very, very narrow performance window,
- is only designed for one request at a time,
- violates the fundamental principles of good programming
- with horrendous complexity,
- with pre-optimizing code, and
- by not separating a caller from the external processing with all its inherent risks
… is totally unacceptable in a professional, commercial execution environment.
There are other less catastrophic failures of using this framework as the parallel engine for JDK1.8, but listing them now would be like taking about a bad paint job on the Titanic.
- Work-stealing deques are a deplorable choice for a parallel engine.
- Recursive decomposition outside of a controlled environment is a failure.
- Emulating a dynamic decomposing discipline from inside a recursive decomposing entity is a fool's errand.
- Devising a significant part of core Java™ that cannot execute in an enterprise environment is amateur.
As more and more developers use the Parallel Bulk Data features, the parallel engine driving the functionality will show its true colors as an academic experiment underpinning a research paper not a general-purpose, commercial development service. Calamity is inevitable.
The pragmatic solution for Oracle® is to scrap the F/J framework as a parallel engine and start over. Because if they don't, the problems will only get worse and it will cost more later to replace the framework.
Paraquential — [a portmanteau word derived by combining parallel with sequential]
The illusion of parallelization. Processing starts out in parallel mode by engaging multiple threads but quickly reverts to sequential computing by restricting further thread commitment.
Download the source code for the article here.
C# Parallel Programming in the .NET Framework
Component Processing, Java™ SE
Component Processing, Android™
Part one article — A Java™ Fork-Join Calamity
A Java Fork/Join Framework — Doug Lea
JDK1.8 Concurrent Hash Map on Concurrency Interest List
JDK1.8 Java™ Extension Proposal 107
JSR166e — JDK1.8 source code for the framework (pre Lambda)
JDK1.8 Java™ Early release (Lambda) — source code for the framework
About the Author
Edward Harned is a software developer with over thirty years industry experience. He first led projects as an employee in major industries and then worked as an independent consultant. Today, Ed is a senior developer at Cooperative Software Systems, Inc., where, for the last fourteen years, he has used Java™ programming to bring fork-join solutions to a wide range of tasks.
© 2011 - 2013 E.P. Harned All rights reserved