Fork-Join Development in Java SE

Fork-Join for everyday, multi-core Java™ applications

Forking or splitting the workload into multiple tasks for parallel processing and joining the results together is a technique used in countless scientific, number crunching applications. Many other applications could benefit from fork-join processing but using the scientific approach may not be in their best interest.

This article presents an “embarrassingly parallel” fork-join approach that works well for everyday, multi-core applications in Java™ SE, ME as well as Android.™ (3000 words)

Edward Harned (eh at coopsoft dot com)
Senior Developer, Cooperative Software Systems, Inc.
February, 2010  [updated June, 2013]

Serbo-Croatian translation on June, 2013.

What is Fork-Join?

Think of a fork in the road where each path eventually comes back together — joins.

Fork-Join breaks an application into several parts for parallel processing and joins the results at the end.

Figure 1: Fork-Join Structure

Fork-Join Structure

Let’s say we have an array of one thousand numbers. We need to do a procedure on each of these numbers and add the total.

Listing 1: Array Processing

 
 
for (int i = 0; i < 1000; i++) {
      total += doProcedure(array[i]);
  }

If the procedure takes one second (wall-clock time) to complete, then it is going to take one thousand seconds (over 16½ minutes) to complete this task.

Fork-Join could

That would take one hundred seconds (just over 1½ minutes), one tenth of the original time. The more CPU's available, the faster the result.

This is what scientific computing is all about — simultaneously processing humongous amounts of data on as many CPU’s as available. This abstraction closely resembles the standard scientific model of Divide-and-Conquer.

Divide-and-Conquer is a natural paradigm for parallel algorithms. After dividing a problem into two or more sub-problems, the method solves the sub-problems in parallel. Typically, the sub-problems are solved recursively and thus the next divide step yields even more sub-problems for solving in parallel.

Figure 2: Divide-and-Conquer

Divide and Conquer

Problems using Fork-Join scientific model for everyday applications

Creating tasks is not the problem; they’re only objects. The problem is a high number of threads processing the tasks when those tasks need:

Connections
Accessing remote services (DBMS, messaging, and many others) requires a connection to the remote service. Generally, the remote services use a thread to handle the connection and that requires memory, context switching, synchronization, and coordination. The more connections to the service, the more resources the service needs, and the less connections available for other tasks in the JVM. That affects every user.

Locks
Locks are a killer to high performance. Dead/live locks, priority inversion, starvation, convoying and overhead (that goes up exponentially with the length of the list of waiting tasks) are some of the problems of using locks.

Semaphores
The more threads that want a permit concurrently, the more threads that must wait for permit availability. This brings us back to all the problems of using locks.

Cache coherency
When multiple processors access/update the same variable inside a cache line, (block of data copied from main memory containing many fields), the memory unit may invalidate the cache line. That not only slows down an application, it may affect other application as well.

Extensive memory
The more objects or the bigger the objects, the more memory. The more active threads handling the tasks, then the more memory in use. Naturally, it follows that large memory tasks need throttling.

The need to play nice
You’re application may not be the only application running on the computer. When one application hogs the resources, everyone feels the pain. Playing nice with others goes back to what we all learned in childhood. The same holds true when developing software that does not run as a standalone application.

The theme of multi-core development is to keep contention, tasks competing for the same resources, to a minimum.

If the dynamic decomposition paradigm of Divide-and-Conquer suits your needs, then read this article about the high performance DSE version of Tymeac. Otherwise a Functional Forking Framework may better suit your needs.

Functional Forking Framework

Java™SE / ME multi-core applications as well as Android™ applications that cannot use the Divide-and-Conquer model, do not process large arrays of numbers, or do not have a compute intensive structure need a functional forking framework for parallelizing applications. Specifically, they need to fork the work into its functional components rather than decompose an array into identical subtasks.

Figure 3: Functional Forking Framework

Functional Forking Framework

A functional forking framework has two essential attributes. It must:

Limit Contention

Keeping the number of active, competing threads to an absolute minimum is paramount. The easiest way to limit thread contention is to use thresholds for each thread pool servicing a queue of tasks. See this article on High Performance Priority Queues in Java SE for an example of how using Wait Lists can conserve threading resources.

Reusing resources rather than acquiring new copies of resources is a winner all-around. We need to consider not only the task code, but the resource management code as well.

Take the example of a task needing to access a database that requires a [java.sql.]statement. By using a queue of requests, the task code can share the same statement for many accesses rather then acquiring a new statement for each access. Sharing a statement is a huge saving in overhead and limits contention within the management code.

What is embarrassingly parallel?

Embarrassingly parallel algorithms are those that can solve many similar but independent tasks simultaneously with little to no need for coordination between the tasks. These kinds of problems have such easy parallelization that one is almost "embarrassed" to talk about how simple it is to get many processors working efficiently.

An embarrassingly parallel solution may easily fork into a number of completely independent components, each executing on a separate processor.

Figure 4: Embarrassingly Parallel

Embarrassingly Parallel

For example:
A business might need an automated price quote system. To develop a quote, the system needs the item’s base price (price database), the customer’s discount for items and shipping (customer database), and basic shipping costs (shipper database.)

Traditionally, the program accesses each database serially, waiting for each access to complete before moving to the next access.

In a parallel system, the program Forks() the request into three queues, each serviced by a thread pool, waits until the last access finishes and Joins() the results together.

Figure 5: Price Quote

The above price quote is an example of a synchronous request, where the caller waits for completion. It is only a small step forward to add support for the asynchronous or autonomous request, where the caller does not wait for completion.

There are many, many situations where forking the work into its components is desirable:

It’s an endeavor to see just what application cannot use parallelization with a functional forking framework.

How would this framework look in a Java™ application?

A framework for forking the request into its functional components needs to:

Know the components (Queues) for each request operation (Function.) A simple Class containing a String Function name and a list of the associated Queues is basic Java™ programming.

Listing 2: Function Class

 
 
public class Function {

      private String    name; // Function name
      private Queue[] que;   // Queues for this Function
  }
 

Place the request (containing the input objects) into each of the queues returning an object array to the caller or ignoring the returned objects.

Listing 3: Put in Queue

 
 
public Object[] fork(Queue[] que, Object request) {

      Object[] return_obj = new Object[] {null, null, null};

      for (int i = 0; i < que.length; i++) {
           putInQueue(que[i], return_obj [i], request);
      }

       return return_obj;
}
 

Wait for completion/timeout or do not wait.

Listing 4: Wait/noWait

 
 
public boolean join(Object[] obj) {

    /* when all elements are non-null, return true
     * wait for a while
     * after an interval, return false
     */

  }
 

Return the results to the caller or ignore the objects

Figure 6: Return to Caller

Return Object[]

 

 

 

 

 

To build this framework we would:

  1. Need to maintain the actual task code that does the work
  2. Need to maintain a list of Queues and Functions
  3. Need to maintain a “start up” class that loads the Queues and Functions into memory

(1) The code that does the work should look like this:

Listing 5: Work Code

 
 
public static Object main(Object obj) {}
 

A main() method that accepts an object (the input from the caller) and returns an object (the result of the work.)

(2) We could maintain the Queues and Functions as objects within a simple List Class.

(3) Start up could simply load the List Classes into memory with a new (List Class) and start the threads for each Queue.

How a simple call could look:

Listing 6: Simple Call

 
    Framework
fw = new Framework();

    // For each call:
    Function
func = fw.getFunction(name);
    Object[] back =
func.fork(request};
 

This framework is simple to use, embarrassingly simple.

Summary

So far, we’ve seen how forking a request into its functional components can work as an embedded part of a single application (within the same JVM.) To be practical, we also need to make the framework accessible from other JVM’s. Simply, it must support many user calls concurrently as a server.

Making it a Server

What changes must we make to forge this simple framework into a Server?

  1. We must separate the caller from the work.
  2. We must provide error recovery.
  3. We must support the framework as a remote object.
  4. We must provide security.
  5. We must provide administrative functionality for controlling the server.


Separation
The first change is separating the request brokering (that’s the above fork() method) from the actual processing. We need to separate, but keep track of, each request in a unique object.

Listing 7: Request Object

 
  private long              unique_id; // unique identification of this request
  private Object           input; // input reference, if any
  private boolean         type; // type of request true=sync false=async
  private Object[]         output // the output objects from the tasks
  private AtomicInteger next_output; // subscript to add an element to above
  private Queue[]         que_names; // list of all the queues in this function
  private Queue            agent; // future queue, if used
  private AtomicInteger nbr_remaining; // queues remaining to be processed
  private int                wait_time; // max wait time for sync request
 

What is field “agent?”

Agent
A synchronous request returns the Object array from the processing to the caller. What should the framework do with the Object array returned from an asynchronous request? The framework may optionally put the Object array (as an input) into a new Queue for processing by an agent task. This way the agent task can take action based on the completion status of the prior processing.

For example:
A Function is to generate a price quote and email it to a user as an asynchronous request.

  1. The caller uses the asynchronous fork() method.
  2. The framework forks the request into its respective Queues.
  3. When the last Queue finishes, the framework passes the returned object array to the agent task by putting the request into the Queue specified as “agent.”
  4. The agent task sends the email returning nothing.

Error recovery
The second change is adding error recovery, the mark of professionalism.

What can go wrong here? "Anything that can go wrong will go wrong." Murphy’s Law.

Back out
We could have a forking error. A bounded Queue could be full or the Queue could be disabled (more on this below.) Error recovery should back out all the Queues that forked successfully and inform the caller of the problem. For example:

  • We have three Queues (A, B, C) in the Function.
  • Queues A and B successfully receive the request.
  • Queue C fails to receive the request because the Queue is full.
  • Now we go backwards trying to pull the request out of all the Queues that forked successfully so we can save processing time for faulty requests.

Exception/Error
We could have an exception/error in the actual task code that does the work. If it failed once, it probably will fail again. Therefore, it is advisable to disable the Queue until a developer fixes the problem. When the task code is clean, we don’t want to take down the server. We want to inform the server that we have a new copy of the task code that is clean and we want the Queue enabled.

Stall
We could have the above happen in an asynchronous request, called a stall (synchronous requests time out and can purge from the system.) Since Functions cannot complete until all the Queues finish, we need to place stalled requests into a Stalled List. When the Queue is again serviceable, we can re-start the processing from the Stalled List.

.
Expunging is a subject unto itself and requires thread containment. This article introduces the subject: Managing Threads in Java SE

Thread quandary
We could have a thread block forever on an outside resource or go into a never-ending loop. Either way, by timing events in the life of a thread, the framework may recognize this situation and may expunge the thread replacing it with a new thread.

Canceling
We could have a caller want to cancel a previously submitted request. The cancel is similar to a time-out error but it is applicable to both synchronous and asynchronous requests. Although canceling a request is most desirable, the logic for handling a cancellation in a multi-component request is not for the faint of heart.

Monitoring
Timing is useless unless a daemon thread monitors the timed events looking for actual or potential problems.

Notification
No framework can handle every situation; sometimes human intervention is necessary. We should notify administrators by sending a message through whatever means the organization uses (Instant Messaging, email, or any homegrown method.)

Remote object
The third change is supporting the framework as a remote object with optional activation/deactivation to conserve resources.

Remote Method Invocation comes in many flavors:
        Basic
        Custom Socket Factory
        IIOP
        Portable Object Adapter
        Jini
        Inter Process Communication

Your environment may consist of a cloud with separate processors at many different locations. Making the framework flexible makes perfect sense.

Security
The fourth change is adding security.

Java™ security technology is part of the SE/ME platforms, it requires front-ending the server with security classes for flexibility.

Administrator functions
The fifth change is adding administrator functions.

Logging is boring and mainly useless, until something goes wrong.

Statistics are the basis for performance analysis and tuning.

We need to provide interfaces to the internal structures so users can monitor and control functionality. It isn’t much good if no one knows what it is doing. Once people know what it is doing, they probably will want to change it.

Writing a Fork-Join framework that is simple to use and efficient for local calls is difficult. Doing the same for network access is a major undertaking.

How long would it take to build such a Server?

About 5-6 seconds. Just long enough to unzip one file.

Happily, there are general-purpose, fork-join frameworks supporting the properties mentioned above for everyday, multi-core applications in Java™ SE, ME and Android™ available today. And since the framework can run as an RMI Server (standard/activatable, IIOP and POA) it is available for Java™ EE applications.

Tymeac for the Java™ SE / ME / Android™ platforms are Open Source Software projects maintained on
and you can download the latest editions there.
. SourceForge.net

Conclusion

Using a Fork-Join framework developed for the compute-intensive communities may not work well for everyday applications.

The bulk of Java™ multi-core applications need to fork the work into its functional components with a professional grade framework that is easy to use, efficient, and open-source.

References

Downloads:

Download the latest SE edition of Tymeac here. With all the documentation, scripts, classes and source.

Download the latest ME edition of Tymeac here. With all the documentation and source.

Download the latest AND edition of Tymeac here. With all the documentation and full eclipse projects.

Download the latest DSE edition of Tymeac here. The Divide-and-Conquer version.

Articles:

The high performance Divide-and-Conquer version of Tymeac A Java Fork-Join Conqueror

The high performance Android™ version of Tymeac Managing Threads in Android

Using Wait Lists for Efficiency High Performance Priority Queues in Java SE

The Java™ SE Thread Container Managing Threads in Java SE

Other:

Fork-join queue wiki http://en.wikipedia.org/wiki/Fork-join_queue

Murphy’s Law http://en.wikipedia.org/wiki/Murphy%27s_law

CPU cache wiki http://en.wikipedia.org/wiki/CPU_cache

Cache coherence wiki http://en.wikipedia.org/wiki/Cache_coherence

Embarrassingly parallel wiki http://en.wikipedia.org/wiki/Embarrassingly_parallel

About the Author

Edward Harned is a software developer with over thirty years industry experience. He first led projects as an employee in major industries and then worked as an independent consultant. Today, Ed is a senior developer at Cooperative Software Systems, Inc., where, for the last twelve years, he has used Java™ programming to bring fork-join solutions to a wide range of tasks.

© 2010 - 2011  E.P. Harned  All rights reserved