Lesson 8 | From wait/notify to Modern Concurrency |
Objective | Introduce Fork/Join as a modern concurrency model, then preserve wait/notify as a historical note with correct usage. |
The Fork/Join Framework, introduced in Java SE 7, simplifies parallel processing for problems that can be recursively split into independent subtasks (divide-and-conquer). A large task is forked into smaller pieces processed concurrently across CPU cores, and their results are joined into a final answer.
At its core is ForkJoinPool
, a specialized ExecutorService
for running ForkJoinTask
instances using a work-stealing scheduler: idle worker threads “steal” tasks from busier threads to keep CPUs saturated. You typically extend RecursiveTask<R>
(returns a result) or RecursiveAction
(no result) and implement compute()
with logic to split or solve directly.
Below, SumTask
splits until a threshold, solves small segments directly, and joins partial sums.
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* A RecursiveTask to sum all elements in an array segment.
*/
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 1000;
private final long[] array;
private final int start;
private final int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
}
int mid = start + (length / 2);
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork(); // run left asynchronously
long rightResult = rightTask.compute(); // compute right inline
long leftResult = leftTask.join(); // wait for left
return leftResult + rightResult;
}
}
/**
* Main class to demonstrate the Fork/Join Framework.
*/
public class ForkJoinSumExample {
public static void main(String[] args) {
int size = 2_000_000;
long[] array = new long[size];
for (int i = 0; i < size; i++) array[i] = i + 1;
ForkJoinPool pool = ForkJoinPool.commonPool();
long totalSum = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("Fork/Join parallelism: " + pool.getParallelism());
System.out.println("Total sum: " + totalSum);
long expected = (long) size * (size + 1) / 2;
System.out.println("Expected sum: " + expected);
System.out.println("Correct: " + (totalSum == expected));
}
}
Also consider: higher-level primitives such as CompletableFuture
, ExecutorService
, BlockingQueue
, Semaphore
, Phaser
, and concurrent collections. They reduce the need for manual locking.
Prior to modern executors and Fork/Join, Java concurrency often used intrinsic locks with synchronized
plus Object.wait()
/ Object.notify()
/notifyAll()
. Use these carefully:
synchronized
is a keyword, not a method; it acquires the monitor of the target object.wait()
and notify/notifyAll()
are methods on java.lang.Object
, and must be called while holding that object’s monitor (inside a matching synchronized
block).wait()
atomically releases the monitor and suspends the thread until notified; on return, the monitor is re-acquired.notify()
or notifyAll()
does not release the lock immediately; the lock is released at the end of the synchronized block.
final Object lock = new Object();
boolean ready = false;
// Waiter
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!ready) {
try { lock.wait(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; }
}
// proceed
}
});
// Notifier
Thread t2 = new Thread(() -> {
synchronized (lock) {
ready = true;
lock.notifyAll();
}
});
join | java.lang.Thread |
lock | NONE (use java.util.concurrent.locks.Lock) |
notify / notifyAll | java.lang.Object |
run / sleep / start | java.lang.Thread |
wait | java.lang.Object |
synchronized | Keyword (not a method) |
Once you grasp the legacy model, prefer executors, Fork/Join, and high-level concurrency utilities for new development.