java.util.concurrent
https://docs.oracle.com/javase/tutorial/essential/concurrency/index.html
https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#MemoryVisibility
In depth tutorials
Compact summary
Definition
Process
The JVM is run as a single process process scheduled by the OS.
A Java application can also create additional processes (not part of this tutorial).
Thread
Lightweight processes - creating it requires less resources.
Exist within a process and share its resources.
You start with the mainthread .
Thread
Runnable
Runnable is an interface which represents a task that could be executed by either a
Thread
or
Executor
or some similar means.
They are passed to the
Thread
s constructor.
Interface that classes that run within the Thread Object must implement.
public class HelloRunnable implements Runnable {public void run() {
System.out.println("Hello from a thread!");
}public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}}
Thread API
.sleep()
suspend execution inside a Thread (not precise).
Make CPU time available for other threads or processes.
Can be interrupted with
InterruptedException
.
.interrupt()
Usually interruped threads are programmed to terminate (
return
from the run method).
When interrupted, a
InterruptedException
is thrown in the Thread.
If the used methods do not invoke this execution, we must periodically check it ourselves with
.interrupted()
and then throw the exception ourselves.
For interruptions, we set the interrupt-status-flag of a Thread.
Static method:
Thread.interrupt()
→ sets flag to true
Static method:
Thread.interrupted()
→ sets flag to false
Non-static method:
isInterrupted()
→ doesnt change flag
Throwing
InterruptedException
and exiting → sets flag to false
t.join()
Called inside a Thread.
Waits for termination of
t
.
Can be interrupted with
InterruptedException
.
Example
We have 2 Threads:
main-thread
creates a new thread from the Runnable,
MessageLoop
and waits for it to finish.
If it takes to long, the main-thread will interrupt it.
public class SimpleThreads {// Display message + name of the current thread
public static void threadMessage(String message) {
String threadName = Thread.currentThread().getName();
System.out.format("%s: %s%n", threadName, message);
}public static void main(String args[]) throws InterruptedException {
long patience = 1000 * 60 * 60; // one hour//start Timer
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
//start MessageLoop
Thread t = new Thread(new MessageLoop());
t.start();threadMessage("Waiting for MessageLoop thread to finish");
while (t.isAlive()) {
threadMessage("Still waiting...");
// Wait max 1 second
t.join(1000);
if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
t.join(); // wait indefinitely
}
}
threadMessage("Finally!");
}
}
private static class MessageLoop implements Runnable {
public void run() {
String importantInfo[] = { "Mares eat oats", ...};
try {
for (int i = 0; i < importantInfo.length; i++) {
Thread.sleep(4000);
SimpleThreads.threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
SimpleThreads.threadMessage("I wasn't done!");
}
}
}
Synchronization
Threads communicate by sharing memory. This might cause:
Thread Interference
Multiple Threads access shared memory
This happens when two operations running on different threads acting on the same data, interleave .
Memory consistency error
Inconsistent views of shared memory.
There is no fixed execution-order.
Solution: Synchronization.
Synchronized Methods: Atomic actions
No interleaving allowed: "Kritischer Abschnitt"
public class SynchronizedCounter {
private int c = 0;public synchronized void increment() {
c++;
}public synchronized void decrement() {
c--;
}public synchronized int value() {
return c;
}
}
Synchronized Statements: making only a part of the method atomic
Built on the intrinsic lock or monitor lock .
Thread that needs exclusive access has to acquire the intrinsic lock, access it, then release the intrinsic lock when it's done.
A thread is said to own the intrinsic lock between the time it has acquired the lock and released the lock.
Synchronized statements must specify the object that provides the intrinsic lock:
public void addName(String name) {
synchronized(this) {
lastName = name;
nameCount++;
}
nameList.add(name);
}
To avoid unneccasry blocking, where interleaving does not affect the memory:
Incrementing
c1
and
c2
can interleave since they are never read together, but the operations must be synchronized.
public class Example {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();public void inc1() {
synchronized(lock1) {
c1++;
}
}public void inc2() {
synchronized(lock2) {
c2++;
}
}
}
Atomic Access
No interruptions: Action happens completely or not at all.
No side effects are visible until the action is complete.
Reads and writes are atomic for all primitive variables - but
long
and
double
variables must be declared
volatile
Liveliness
Synchronization can cause thread contention like these.
Deadlock
describes a situation where two or more threads are blocked forever, waiting for each other.
Example
Two Friends (Gaston, Alphonse) who bow and remain bowed until the other friend can bow back.
If both bow at the same time, they remain locked forever. Neither block will ever end, because each thread is waiting for the other to
exit
bow
.
public class Deadlock {
static class Friend {
private final String name;public Friend(String name) { ... }
public String getName() { ... }public synchronized void bow(Friend bower) {
System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName());
bower.bowBack(this);
}public synchronized void bowBack(Friend bower) {
System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName());
}
}public static void main(String[] args) {
final Friend alphonse = new Friend("Alphonse");
final Friend gaston = new Friend("Gaston");
new Thread(new Runnable() {
public void run() {
alphonse.bow(gaston);
}
}).start();
new Thread(new Runnable() {
public void run() {
gaston.bow(alphonse);
}
}).start();
}
}
Starvation
situation where a thread is unable to gain regular access to shared resources and is unable to make progress.
This happens when shared resources are made unavailable for long periods by "greedy" threads.
Livelock
The threads are not blocked but still make no process- they are simply too busy responding to each other to resume work.
Guarded Blocks
Only proceed if a specified condition is met.
Using loops is wasteful:
public void guardedJoy() {
while(!joy) {}
System.out.println("Joy has been achieved!");
}
Object.wait()
Suspends current Thread until (any) event - then it checks if the condition was met, again.
public synchronized void guardedJoy() {
// This guard only loops once for each special event, which may not
// be the event we're waiting for.
while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println("Joy and efficiency have been achieved!");
}
Object.notifyAll()
Event
public synchronized notifyJoy() {
joy = true;
notifyAll();
}
Producer-Consumer-Example
Only used to demonstrate guarded blocks. This could be done a lot easier using java.collections.
public class Drop {
// Message producer -> consumer
private String message;
// True: consumer should wait (for producer to put())
// False: producer should wait (for consumer to take())
private boolean empty = true;public synchronized String take() {
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
empty = true;
notifyAll(); // Notify producer that status has changed.
return message;
}public synchronized void put(String message) {
while (!empty) {
try {
wait();
} catch (InterruptedException e) {}
}
empty = false;
this.message = message;
notifyAll(); // Notify consumer that status has changed.
}
}
public class Producer implements Runnable {
private Drop drop;public Producer(Drop drop) {
this.drop = drop;
}public void run() {
String importantInfo[] = { "Mares eat oats", ... };for (int i = 0; i < importantInfo.length; i++) {
drop.put(importantInfo[i]);
try {
Thread.sleep(...random time);
} catch (InterruptedException e) {}
}
drop.put("DONE");
}
}
public class Consumer implements Runnable {
private Drop drop;public Consumer(Drop drop) {
this.drop = drop;
}public void run() {
for (String message = drop.take();
! message.equals("DONE");
message = drop.take()) {System.out.format("MESSAGE RECEIVED: %s%n", message);
try {
Thread.sleep(...random time);
} catch (InterruptedException e) {}
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
High level Concurrency Objects
Synchronized code is build on top of a simple reentrant lock .
Easy to use but many limitations.
One thread can lock an object at a time (kritischen Abschnitt betreten).
We now encapsulate task creation and management (previously done by threads) and the tasks themselves (runnables).
Lock
Lock
Interface in
java.util.concurrent.locks
Example: Solve Deadlock
Example
public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); }
static class BowLoop implements Runnable { private Friend bower; private Friend bowee;public BowLoop(Friend bower, Friend bowee) { ... }public void run() { while(true) { try { Thread.sleep(...random time); } catch (InterruptedException e) {} bowee.bow(bower); } } }
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;static class Friend {private final String name; private final Lock lock = new ReentrantLock();public Friend(String name) { ... } public String getName() { ... }//if it returns false: both bowing at the same time public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { // returns true if the lock was free and was acquired by the current thread myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { if (!myLock || !yourLock)) { //if one of us wasnt free if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } return myLock && yourLock; }public void bow(Friend bower) { if (impendingBow(bower)) { try { System.out.format( "%s: %s has" + " bowed to me!%n", this.name, bower.getName() ); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { System.out.format( "%s: %s started" + "to bow to me, but saw that I was already bowing to him.%n", this.name, bower.getName() ); } }public void bowBack(Friend bower) { System.out.format( "%s: %s has" + " bowed back to me!%n", this.name, bower.getName() ); } }
Executor
Previously, we seperated the task being done by the thread (Runnable), and the thread itself, responsible for creating tasks.
We dont want to sperate thread management and creation from the rest of the application in large applications.
Executor
Interface for launching new tasks.
The Executor runs an existing worker thread to run the runnable
r
or place it in a queue until the worker thread is available
(→ see thread pools)
.
Runnable r = ...
(new Thread(r)).start();
Runnable r = ...
Executor e = ...
e.execute(r);
ExecutorService
Subinterface of Executor.
Additional features for life cycle.
.submit()
Accepts
Runnable
and
Callable
returns a
Future
Object
Can handle large collections of
Callable
objects.
Can handle immediate shutdown of tasks.
ScheduledExecutorService
Subinterface of ExecutorService.
Additional features for future and periodic execution of tasks.
.schedule()
Accepts
Runnable
and
Callable
Calls tasks with specified delay or in intervals with fixed delays.
Thread Pools
These are implementations of the
ExecutorService
.
One can also implement the
ThreadPoolExecutor
Interface to implement own thread pools.
Thread pools consist of worker-threads .
Used to execute multiple tasks.
Less overhead for thread-creation.
newFixedThreadPool
Has a maximum of allowed threads runnning.
Works with internal queue.
Useful for servers handling requests.
newCachedThreadPool
Creates Executor with expandable threadpool.
For short-lived tasks.
newSingleThreadExecutor
Thread pool size = 1
ForkJoinPool
Can execute
ForkJoinTask
Processes.
Helps take advantage of multiple CPU cores.
work-stealing algorithm : workers that run out of things to do can steal tasks from workers that are still busy.
Usage: Writing small portions of code in a
ForkJoinTask
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
Concurrent Collections
Help avoid Memory Consistency Errors.
Set an Execution Order.
BlockingQueue
Fifo Queue
Blocks when you try to add to a full queue or retrieve from an empty queue.
ConcurrentMap
/
ConcurrentHashMap
Subinterface of
java.util.Map
/java.util.Hashmap
Atomic operations
ConcurrentNavigableMap
Analog of
TreeMap
Atomic Variables
Atomic operations on a single variable.
All classes have
get
and
set
methods that work like reads and writes on
volatile
variables.
import java.util.concurrent.atomic.AtomicInteger;class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);public void increment() {
c.incrementAndGet();
}public void decrement() {
c.decrementAndGet();
}public int value() {
return c.get();
}}