java.util.concurrent

https://docs.oracle.com/javase/tutorial/essential/concurrency/index.html

https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#MemoryVisibility

In depth tutorials

Compact summary

Definition

Process

The JVM is run as a single process process scheduled by the OS.

A Java application can also create additional processes (not part of this tutorial).

Thread

Lightweight processes - creating it requires less resources.

Exist within a process and share its resources.

You start with the mainthread .

Thread

Runnable

Runnable is an interface which represents a task that could be executed by either a Thread or Executor or some similar means.

They are passed to the Thread s constructor.

Interface that classes that run within the Thread Object must implement.

public class HelloRunnable implements Runnable {public void run() {
System.out.println("Hello from a thread!");
}public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}}

Thread API

.sleep()

suspend execution inside a Thread (not precise).

Make CPU time available for other threads or processes.

Can be interrupted with InterruptedException .

.interrupt()

Usually interruped threads are programmed to terminate ( return from the run method).

When interrupted, a InterruptedException is thrown in the Thread.

If the used methods do not invoke this execution, we must periodically check it ourselves with .interrupted() and then throw the exception ourselves.


For interruptions, we set the interrupt-status-flag of a Thread.

Static method: Thread.interrupt() → sets flag to true

Static method: Thread.interrupted() → sets flag to false

Non-static method: isInterrupted() → doesnt change flag

Throwing InterruptedException and exiting → sets flag to false

t.join()

Called inside a Thread.

Waits for termination of t .

Can be interrupted with InterruptedException .

Example

We have 2 Threads:

main-thread creates a new thread from the Runnable, MessageLoop and waits for it to finish.

If it takes to long, the main-thread will interrupt it.

public class SimpleThreads {// Display message + name of the current thread
public static void threadMessage(String message) {
String threadName = Thread.currentThread().getName();
System.out.format("%s: %s%n", threadName, message);
}public static void main(String args[]) throws InterruptedException {
long patience = 1000 * 60 * 60; // one hour//start Timer
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
//start MessageLoop
Thread t = new Thread(new MessageLoop());
t.start();threadMessage("Waiting for MessageLoop thread to finish");
while (t.isAlive()) {
threadMessage("Still waiting...");
// Wait max 1 second
t.join(1000);
if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
t.join(); // wait indefinitely
}
}
threadMessage("Finally!");
}
}
private static class MessageLoop implements Runnable {
public void run() {
String importantInfo[] = { "Mares eat oats", ...};
try {
for (int i = 0; i < importantInfo.length; i++) {
Thread.sleep(4000);
SimpleThreads.threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
SimpleThreads.threadMessage("I wasn't done!");
}
}
}

Synchronization

Threads communicate by sharing memory. This might cause:

Thread Interference

Multiple Threads access shared memory

This happens when two operations running on different threads acting on the same data, interleave .

Memory consistency error

Inconsistent views of shared memory.

There is no fixed execution-order.

Solution: Synchronization.

Synchronized Methods: Atomic actions

No interleaving allowed: "Kritischer Abschnitt"

public class SynchronizedCounter {
private int c = 0;public synchronized void increment() {
c++;
}public synchronized void decrement() {
c--;
}public synchronized int value() {
return c;
}
}

Synchronized Statements: making only a part of the method atomic

Built on the intrinsic lock or monitor lock .

Thread that needs exclusive access has to acquire the intrinsic lock, access it, then release the intrinsic lock when it's done.

A thread is said to own the intrinsic lock between the time it has acquired the lock and released the lock.

Synchronized statements must specify the object that provides the intrinsic lock:

public void addName(String name) {
synchronized(this) {
lastName = name;
nameCount++;
}
nameList.add(name);
}

To avoid unneccasry blocking, where interleaving does not affect the memory:

Incrementing c1 and c2 can interleave since they are never read together, but the operations must be synchronized.

public class Example {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();public void inc1() {
synchronized(lock1) {
c1++;
}
}public void inc2() {
synchronized(lock2) {
c2++;
}
}
}

Atomic Access

No interruptions: Action happens completely or not at all.

No side effects are visible until the action is complete.

Reads and writes are atomic for all primitive variables - but long and double variables must be declared volatile

Liveliness

Synchronization can cause thread contention like these.

Deadlock

describes a situation where two or more threads are blocked forever, waiting for each other.

Example

Two Friends (Gaston, Alphonse) who bow and remain bowed until the other friend can bow back.

If both bow at the same time, they remain locked forever. Neither block will ever end, because each thread is waiting for the other to exit bow .

public class Deadlock {
static class Friend {
private final String name;public Friend(String name) { ... }
public String getName() { ... }public synchronized void bow(Friend bower) {
System.out.format("%s: %s" + "  has bowed to me!%n", this.name, bower.getName());
bower.bowBack(this);
}public synchronized void bowBack(Friend bower) {
System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName());
}
}public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new Runnable() {
public void run() {
alphonse.bow(gaston);
}
}).start();
new Thread(new Runnable() {
public void run() {
gaston.bow(alphonse);
}
}).start();
}
}

Starvation

situation where a thread is unable to gain regular access to shared resources and is unable to make progress.

This happens when shared resources are made unavailable for long periods by "greedy" threads.

Livelock

The threads are not blocked but still make no process- they are simply too busy responding to each other to resume work.

Guarded Blocks

Only proceed if a specified condition is met.

Using loops is wasteful:

public void guardedJoy() {
while(!joy) {}
System.out.println("Joy has been achieved!");
}

Object.wait()

Suspends current Thread until (any) event - then it checks if the condition was met, again.

public synchronized void guardedJoy() {
// This guard only loops once for each special event, which may not
// be the event we're waiting for.
while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println("Joy and efficiency have been achieved!");
}

Object.notifyAll()

Event

public synchronized notifyJoy() {
joy = true;
notifyAll();
}

Producer-Consumer-Example

Only used to demonstrate guarded blocks. This could be done a lot easier using java.collections.

public class Drop {
// Message producer -> consumer
private String message;
// True: consumer should wait (for producer to put())
// False: producer should wait (for consumer to take())
private boolean empty = true;public synchronized String take() {
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
empty = true;
notifyAll(); // Notify producer that status has changed.
return message;
}public synchronized void put(String message) {
while (!empty) {
try {
wait();
} catch (InterruptedException e) {}
}
empty = false;
this.message = message;
notifyAll(); // Notify consumer that status has changed.
}
}

public class Producer implements Runnable {
private Drop drop;public Producer(Drop drop) {
this.drop = drop;
}public void run() {
String importantInfo[] = { "Mares eat oats", ... };for (int i = 0; i < importantInfo.length; i++) {
drop.put(importantInfo[i]);
try {
Thread.sleep(...random time);
} catch (InterruptedException e) {}
}
drop.put("DONE");
}
}
public class Consumer implements Runnable {
private Drop drop;public Consumer(Drop drop) {
this.drop = drop;
}public void run() {
for (String message = drop.take();
! message.equals("DONE");
message = drop.take()) {System.out.format("MESSAGE RECEIVED: %s%n", message);
try {
Thread.sleep(...random time);
} catch (InterruptedException e) {}
}
}
}

public class ProducerConsumerExample {
public static void main(String[] args) {
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}

High level Concurrency Objects

Synchronized code is build on top of a simple reentrant lock .

Easy to use but many limitations.

One thread can lock an object at a time (kritischen Abschnitt betreten).

We now encapsulate task creation and management (previously done by threads) and the tasks themselves (runnables).

Lock

Lock Interface in java.util.concurrent.locks

Example: Solve Deadlock

Executor

Previously, we seperated the task being done by the thread (Runnable), and the thread itself, responsible for creating tasks.

We dont want to sperate thread management and creation from the rest of the application in large applications.

Executor

Interface for launching new tasks.

The Executor runs an existing worker thread to run the runnable r or place it in a queue until the worker thread is available (→ see thread pools) .

Runnable r = ...
(new Thread(r)).start();
Runnable r = ...
Executor e = ...
e.execute(r);

ExecutorService

Subinterface of Executor.

Additional features for life cycle.

.submit()

Accepts Runnable and Callable

returns a Future Object

Can handle large collections of Callable objects.

Can handle immediate shutdown of tasks.

ScheduledExecutorService

Subinterface of ExecutorService.

Additional features for future and periodic execution of tasks.

.schedule()

Accepts Runnable and Callable

Calls tasks with specified delay or in intervals with fixed delays.

Thread Pools

These are implementations of the ExecutorService .

One can also implement the ThreadPoolExecutor Interface to implement own thread pools.

Thread pools consist of worker-threads .

Used to execute multiple tasks.

Less overhead for thread-creation.

newFixedThreadPool

Has a maximum of allowed threads runnning.

Works with internal queue.

Useful for servers handling requests.

newCachedThreadPool

Creates Executor with expandable threadpool.

For short-lived tasks.

newSingleThreadExecutor

Thread pool size = 1

ForkJoinPool

Can execute ForkJoinTask Processes.

Helps take advantage of multiple CPU cores.

work-stealing algorithm : workers that run out of things to do can steal tasks from workers that are still busy.

Usage: Writing small portions of code in a ForkJoinTask

if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results

Concurrent Collections

Help avoid Memory Consistency Errors.

Set an Execution Order.

BlockingQueue

Fifo Queue

Blocks when you try to add to a full queue or retrieve from an empty queue.

ConcurrentMap / ConcurrentHashMap

Subinterface of java.util.Map/java.util.Hashmap

Atomic operations

ConcurrentNavigableMap

Analog of TreeMap

Atomic Variables

Atomic operations on a single variable.

All classes have get and set methods that work like reads and writes on volatile variables.

import java.util.concurrent.atomic.AtomicInteger;class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);public void increment() {
c.incrementAndGet();
}public void decrement() {
c.decrementAndGet();
}public int value() {
return c.get();
}}