/* This program demonstrates the implementation and the use of a concurrent producer-consumer queue. * The main program creates: * - one producer thread that generates numbers from 0 to nrNumbers-1 and writes them in a queue * - a specified number of adders that get the numbers from the producers and add them up. When the * producer closes the queue, the adders write their computed sums into a second queue towards * the printer thread. Note that the adders compete over the numbers from the producer, and it is * unpredictable which adder gets which number. * - finally, one printer thread that gets the partial sums from the adders, adds them all, and prints * the result. */ import java.util.concurrent.locks.*; import java.util.*; public class ProducerConsumer { private static class ProducerConsumerQueue { public void enqueue(T val) { m_mtx.lock(); m_queue.add(val); m_cv.signal(); m_mtx.unlock(); } /* Gets the next element from the queue. If the queue is empty but not closed, this function blocks. If the queue is empty and closed, returns null.*/ public T dequeue() { T ret; m_mtx.lock(); while (true) { if (!m_queue.isEmpty()) { ret = m_queue.remove(); m_mtx.unlock(); return ret; } if (m_isEnd) { m_mtx.unlock(); return null; } try { m_cv.await(); } catch(InterruptedException e) { System.err.println("Exception: " + e); } } } /* Closes the queue.*/ public void close() { m_mtx.lock(); m_isEnd = true; m_cv.signalAll(); m_mtx.unlock(); } private final Lock m_mtx = new ReentrantLock(); Queue m_queue = new ArrayDeque(); private final Condition m_cv = m_mtx.newCondition(); boolean m_isEnd = false; }; private static class Producer implements Runnable { public Producer(ProducerConsumerQueue queue, long nrNumbers) { this.queue = queue; this.nrNumbers = nrNumbers; } @Override public void run() { for (long i = 0; i < nrNumbers; ++i) { queue.enqueue((Integer)(int)i); } queue.close(); } private final ProducerConsumerQueue queue; private final long nrNumbers; } private static class Adder implements Runnable { public Adder(ProducerConsumerQueue inQueue, ProducerConsumerQueue outQueue) { this.inQueue = inQueue; this.outQueue = outQueue; } @Override public void run() { int sum = 0; while (true) { Integer x = inQueue.dequeue(); if (x == null) { break; } sum += x; } outQueue.enqueue(sum); } private final ProducerConsumerQueue inQueue; private final ProducerConsumerQueue outQueue; } private static class Printer implements Runnable { public Printer(ProducerConsumerQueue inQueue, int nrAdders) { this.inQueue = inQueue; this.nrAdders = nrAdders; } @Override public void run() { int sum = 0; for (int i = 0; i < nrAdders; ++i) { Integer x = inQueue.dequeue(); if (x == null) { System.err.println("Unexpected end of second queue."); return; } sum += x; } System.out.println("Sum=" + sum); } private final ProducerConsumerQueue inQueue; private final int nrAdders; } public static void main(String[] args) { int nrAdders; long nrNumbers; if (args.length != 2) { System.err.println("Usage: vector_sum_multithread nrAdders nrNumbers"); return; } try { nrAdders = Integer.parseInt(args[0]); nrNumbers = Integer.parseInt(args[1]); } catch(Exception e) { System.err.println("Exception: " + e); return; } List threads = new ArrayList(); ProducerConsumerQueue firstQueue = new ProducerConsumerQueue(); ProducerConsumerQueue secondQueue = new ProducerConsumerQueue(); threads.add(new Thread(new Printer(secondQueue, nrAdders))); threads.get(0).start(); for(int i=0 ; i