Due thread condividono un buffer limitato (array di N celle): il Produttore inserisce elementi, il Consumatore li preleva. Senza sincronizzazione possono verificarsi due problemi:
Per il buffer limitato servono tre semafori, ognuno con uno scopo preciso:
empty = N
Conta i posti liberi. Il produttore fa acquire() prima di inserire: se Γ¨ 0 (buffer pieno) si blocca.
full = 0
Conta gli elementi presenti. Il consumatore fa acquire() prima di prelevare: se Γ¨ 0 (buffer vuoto) si blocca.
mutex = 1
Esclusione mutua sull'accesso al buffer. Impedisce che produttore e consumatore tocchino il buffer contemporaneamente.
// ProduttoreConsumatore.java import java.util.concurrent.Semaphore; public class ProduttoreConsumatore { static final int N = 5; static int[] buffer = new int[N]; static int in = 0, out = 0; static Semaphore empty = new Semaphore(N); // posti liberi static Semaphore full = new Semaphore(0); // elementi presenti static Semaphore mutex = new Semaphore(1); // accesso esclusivo al buffer static class Produttore extends Thread { public void run() { for (int i = 0; i < 10; i++) { try { empty.acquire(); // c'Γ¨ posto? se 0 β si blocca mutex.acquire(); // accesso esclusivo al buffer buffer[in] = i; in = (in + 1) % N; System.out.println("Prodotto: " + i); mutex.release(); full.release(); // segnala: elemento disponibile } catch (InterruptedException e) {} } } } static class Consumatore extends Thread { public void run() { for (int i = 0; i < 10; i++) { try { full.acquire(); // c'Γ¨ qualcosa? se 0 β si blocca mutex.acquire(); // accesso esclusivo al buffer int item = buffer[out]; out = (out + 1) % N; System.out.println("Consumato: " + item); mutex.release(); empty.release(); // segnala: posto liberato } catch (InterruptedException e) {} } } } public static void main(String[] args) throws InterruptedException { Thread p = new Produttore(); Thread c = new Consumatore(); p.start(); c.start(); p.join(); c.join(); } }
Una risorsa condivisa (es. un database, un file) viene acceduta da N lettori e M scrittori. Le regole fondamentali sono:
I lettori non aspettano se la risorsa Γ¨ giΓ letta. Rischio: gli scrittori vanno in starvation.
Se uno scrittore aspetta, i nuovi lettori vengono bloccati. Rischio: i lettori vanno in starvation.
Variabili usate: read_count (numero di lettori attivi),
mutex (protegge read_count),
wrt (esclusione scrittore).
Il primo lettore che arriva acquisisce wrt; l'ultimo che esce la rilascia.
// LettoriScrittori.java import java.util.concurrent.Semaphore; public class LettoriScrittori { static int readCount = 0; // mutex: protegge la variabile readCount static Semaphore mutex = new Semaphore(1); // wrt: accesso esclusivo allo scrittore static Semaphore wrt = new Semaphore(1); static class Lettore extends Thread { int id; Lettore(int id) { this.id = id; } public void run() { try { mutex.acquire(); readCount++; if (readCount == 1) // primo lettore: blocca scrittori wrt.acquire(); mutex.release(); System.out.println("Lettore " + id + " legge"); // lettura mutex.acquire(); readCount--; if (readCount == 0) // ultimo lettore: sblocca scrittori wrt.release(); mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } static class Scrittore extends Thread { int id; Scrittore(int id) { this.id = id; } public void run() { try { wrt.acquire(); // accesso esclusivo System.out.println("Scrittore " + id + " scrive"); wrt.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { Thread[] threads = { new Lettore(1), new Lettore(2), new Lettore(3), new Scrittore(1) }; for (Thread t : threads) t.start(); for (Thread t : threads) t.join(); } }
5 filosofi siedono a un tavolo rotondo. Tra ogni coppia di filosofi c'Γ¨ una sola forchetta (5 in totale). Per mangiare un filosofo deve prendere entrambe le forchette (sinistra e destra). Quando non mangia, pensa. Il problema modella la contesa su risorse condivise.
π
// Filosofi.java β soluzione asimmetrica (no deadlock) import java.util.concurrent.Semaphore; public class Filosofi { static final int N = 5; // un semaforo per ogni forchetta, inizializzato a 1 (disponibile) static Semaphore[] forchette = new Semaphore[N]; static { for (int i=0; i<N; i++) forchette[i] = new Semaphore(1); } static class Filosofo extends Thread { int id; Filosofo(int id) { this.id = id; } void pensa() throws InterruptedException { System.out.println("F"+id+": penso..."); Thread.sleep((long)(Math.random()*1000)); } void mangia() throws InterruptedException { System.out.println("F"+id+": MANGIO π"); Thread.sleep((long)(Math.random()*1000)); } public void run() { try { while (true) { pensa(); if (id % 2 == 0) { // filosofo PARI forchette[id].acquire(); // sinistra prima forchette[(id+1)%N].acquire(); // poi destra } else { // filosofo DISPARI forchette[(id+1)%N].acquire(); // destra prima forchette[id].acquire(); // poi sinistra } mangia(); forchette[id].release(); forchette[(id+1)%N].release(); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { Thread[] f = new Thread[N]; for (int i=0; i<N; i++) { f[i] = new Filosofo(i); f[i].start(); } for (Thread t : f) t.join(); } }
// SENZA asimmetria: tutti prendono sinistra β nessuno prende destra β DEADLOCK forchette[id].acquire(); // tutti acquisiscono sinistra forchette[(id+1)%N].acquire(); // nessuno riesce ad acquisire destra // β tutti i thread bloccati qui per sempre