Synchronisation
Mutex
Mutual exclusion (often shortened to "mutex") ensures that only one thread can execute a block of code at a given time.
The synchronized
keyword provides a basic mechanism for achieving mutual exclusion.
Problem:
Consider a BankAccount with a method to withdraw money. If two threads try to withdraw money at the same time, they might both check the balance and try to withdraw the amount even if there's only enough for one withdrawal, leading to a potential overdraft. This is a classic race condition.
Solution:
We'll use the synchronized keyword to ensure mutual exclusion on the withdraw operation.
Synchronized Methods:
public class BankAccount {
private double balance;
public BankAccount(double initialBalance) {
this.balance = initialBalance;
}
public synchronized void deposit(double amount) {
if (amount > 0) {
balance += amount;
}
}
public synchronized void withdraw(double amount) {
if (amount > 0 && balance >= amount) {
balance -= amount;
} else {
System.out.println("Insufficient funds!");
}
}
public synchronized double getBalance() {
return balance;
}
}
With the synchronized keyword in place, if one thread is inside a synchronized method (deposit, withdraw, or getBalance), no other thread can enter any of the synchronized methods on the same BankAccount object until the first thread exits.
Using Reentrant Lock:
The ReentrantLock class in the java.util.concurrent.locks package is an advanced mutex implementation that provides more extensive locking operations than what's available using synchronized methods and blocks. It also offers better scalability in certain situations.
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BankAccount {
private double balance;
private final Lock lock = new ReentrantLock();
public BankAccount(double initialBalance) {
this.balance = initialBalance;
}
public void deposit(double amount) {
lock.lock();
try {
if (amount > 0) {
balance += amount;
}
} finally {
lock.unlock();
}
}
public void withdraw(double amount) {
lock.lock();
try {
if (amount > 0 && balance >= amount) {
balance -= amount;
} else {
System.out.println("Insufficient funds!");
}
} finally {
lock.unlock();
}
}
public double getBalance() {
lock.lock();
try {
return balance;
} finally {
lock.unlock();
}
}
}
The key things to note:
- lock.lock(): Acquires the lock. If the lock is not available, the thread will block until it can acquire it.
- Always use the lock inside a try block and unlock inside the finally block. This ensures that the lock is released even if an exception occurs.
Test:
public class BankAccountTest {
public static void main(String[] args) {
BankAccount account = new BankAccount(100);
// Create two threads trying to withdraw money
Thread t1 = new Thread(() -> {
account.withdraw(80);
});
Thread t2 = new Thread(() -> {
account.withdraw(80);
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final balance: " + account.getBalance());
}
}
In the test, we're starting two threads that both try to withdraw $80 from an account with a balance of $100. Without synchronization, both might be successful, which is not desired. With synchronization, one thread will succeed and the other will get an "Insufficient funds!" message, ensuring the balance never goes negative.
Producer Consumer
The Producer-Consumer problem is a classic synchronization problem. One common way to solve this in Java is by using the wait() and notify() methods available in the Object class.
Here's an example using a shared Queue:
Shared Resource:
import java.util.LinkedList;
import java.util.Queue;
public class SharedQueue {
private Queue<Integer> queue = new LinkedList<>();
private final int LIMIT = 5; // Maximum items the queue can have
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (this) {
// Wait if the queue is full
while (queue.size() == LIMIT) {
wait();
}
// Produce an item and notify consumer
queue.add(value);
System.out.println("Produced " + value);
value++;
notifyAll();
// Just for demo purposes, slow down the production
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
// Wait if the queue is empty
while (queue.isEmpty()) {
wait();
}
// Consume an item and notify producer
int value = queue.poll();
System.out.println("Consumed " + value);
notifyAll();
// Just for demo purposes, slow down the consumption
Thread.sleep(1000);
}
}
}
}
Producer and Consumer Threads:
public class ProducerConsumerDemo {
public static void main(String[] args) {
SharedQueue sharedQueue = new SharedQueue();
Thread producerThread = new Thread(() -> {
try {
sharedQueue.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
sharedQueue.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
Here's a brief overview of what's happening:
-
SharedQueue
: This class has a shared resource queue which has a LIMIT on its size. -
produce()
: This method continuously produces items. If the queue reaches its limit, it waits for the consumer to consume some items using wait(). After producing an item, it notifies the consumer using notify(). -
consume()
: This method continuously consumes items. If the queue is empty, it waits for the producer to produce items using wait(). After consuming an item, it notifies the producer using notify(). -
The producer and consumer communicate and synchronize using the wait() and notify() methods. The wait() method releases the lock and waits for a notify() from another thread. The notify() method sends a signal to one of the waiting threads to wake up and continue its execution.
Blocking Queue
The BlockingQueue from the java.util.concurrent package offers a thread-safe implementation for a queue and is often used in a producer-consumer scenario.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class SharedResource {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1); // Size of 1 for simplicity
private int value = 0;
public void produce() throws InterruptedException {
while (true) {
synchronized (this) {
value++;
queue.put(value); // Will block if the queue is full
System.out.println("Produced value: " + value);
// Just for demo purposes, slow down the production
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
int val = queue.take(); // Will block if the queue is empty
System.out.println("Consumed value: " + val);
// Just for demo purposes, slow down the consumption
Thread.sleep(1000);
}
}
}
}
BlockingQueue.put()
: This method inserts the specified element into the queue, waiting if necessary for space to become available.BlockingQueue.take()
: This method retrieves and removes the head of the queue, waiting if necessary until an element becomes available.
Using a BlockingQueue simplifies the producer-consumer implementation since it internally handles all the waiting and notification logic, eliminating the need for explicit wait() and notify() calls.
Exercises
Synchronized Counter
Objective: Practice synchronization with multiple threads. Tasks:
- Create a shared counter object.
- Increment the counter from multiple threads.
- Use the synchronized keyword to ensure that the counter increments are atomic.
- Print the final counter value in the main thread.
Solutions
Synchronized Counter
private static class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
public class Exercise {
public static void main(String[] args) throws InterruptedException {
final Counter counter = new Counter();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("Final counter value: " + counter.getCount());
}
}