Sunday, 16 December 2007

Concurrent fun

Somebody asked a question on Experts Exchange about how to lock an object so that writes are exclusive but concurrent reads are allowed. Locking on a data-holding object using synchronized methods of that object is an all-or-nothing affair since a thread acquiring the monitor, whether to read or write, will lock that object to any other access. The java.util.concurrent package, present in the Java JDK since 1.5 and based on the work of Doug Lea, makes this granularity of lock easy to achieve. I thought I'd write something to acquaint myself more with this package, since, I'm glad to say, I rarely am forced to write multi-threaded code and the little game of chance below is the result. The log4j classes should be in your classpath and your log4j.properties should be something like this one. The executable jar is here
package net.proteanit.demo;

import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

enum AccessType {
   READER,
   WRITER;
}

/**
* This is a simple game of chance to demonstrate ReadWrite locks in Java: a
* writer acquires a write lock and rewrites a List by shuffling it.
* If this ends up being in the correct order, the next reading thread to read
* the data wins the game
*
* @author Charles Johnson
* @version 1.0
 */
public class LockGame {
   private final ReentrantReadWriteLock rwl;
   private final List<String> data;
   private final int correctOrder;
   private final Lock r;
   private final Lock w;
   private final Logger log;
   private ExecutorService threads;
   private volatile boolean playerHasWon;

   public LockGame() {
       log = Logger.getLogger(LockGame.class);
       rwl = new ReentrantReadWriteLock();
       data = new ArrayList<String>();
       data.add("alpha");
       data.add("beta");
       data.add("gamma");
       correctOrder = data.hashCode();
       r = rwl.readLock();
       w = rwl.writeLock();
   }

   private void start() {
       final int SIZE = 16;
       List<Callable<Void>> workers = new ArrayList<Callable<Void>>(SIZE);
       for (int i = 0; i < SIZE; i++) {
           workers.add(new ReaderWriter());
       }
       threads = Executors.newFixedThreadPool(SIZE, new PrettyPrintThreadFactory(99, "thread-"));
       try {
           if (log.isDebugEnabled()) {
               log.debug("Invoking all players...");
           }
           threads.invokeAll(workers);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (RejectedExecutionException e) {
           // Ignore this - some thread has won
       } finally {
           // Another thread might have already called this
           stop();
       }
   }

   private void stop() {
       if (!threads.isShutdown()) {
           threads.shutdownNow();
       }
       if (!playerHasWon && log.isDebugEnabled()) {
           log.debug("There was no winner on that run");
       }
   }

   private void changeData() {
       try {
           w.lock();
           if (log.isDebugEnabled()) {
               log.debug(String.format("Acquired lock in changeData() at %d", System.currentTimeMillis()));
           }
           Collections.shuffle(data);
           if (log.isDebugEnabled()) {
               log.debug(String.format("Data changed - now %s", data.toString()));
           }
           try { Thread.sleep((int)(Math.random() * 1000)); } catch(InterruptedException e) { /* ignore it */ }
       } finally {
           w.unlock();
           if (log.isDebugEnabled()) {
               log.debug(String.format("Released lock in changeData() at %d", System.currentTimeMillis()));
           }
       }
   }

   private void readData() {
       try {
           // We can't be sure that all threads can be closed down
           // after a single winner emerges so we make the boolean check
           if (playerHasWon) {
               return;
           }
           r.lock();
           if (log.isDebugEnabled()) {
               log.debug(String.format("Acquired lock in readData() at %d", System.currentTimeMillis()));
           }
           if (log.isDebugEnabled()) {
               log.debug(String.format("Reading data - now %s", data.toString()));
           }
           if (data.hashCode() == correctOrder) {
               if (log.isDebugEnabled()) {
                   log.debug("I'VE WON! Data now correct");
               }
               playerHasWon = true;
               stop();
           }
           else {
               try { Thread.sleep((int)(Math.random() * 1000)); } catch(InterruptedException e) { /* ignore it */ }
           }
       } finally {
           r.unlock();
           if (log.isDebugEnabled()) {
               log.debug(String.format("Released lock in readData() at %d", System.currentTimeMillis()));
           }
       }
   }

   public static void main(String[] args) {
       LockGame lg = new LockGame();
       lg.changeData();
       lg.start();
   }

   private class ReaderWriter implements Callable<Void> {
       private AccessType access;

       public ReaderWriter() {
           // Roughly 1 in 5 chance of being a writer, else reader
           access = (Math.random() > 0.8) ? AccessType.WRITER : AccessType.READER;
       }

       public Void call() {
           if (log.isDebugEnabled()) {
               log.debug(String.format("This ReaderWriter is of type %s",
                       access));
           }
           switch (access) {
           case WRITER:
               changeData();
               return null;
           case READER:
               readData();
               return null;
           default:
               return null;
           }
       }
   }
}

The log file produced gives us some interesting information, but first we'll do a little processing on it:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 14:43:50,957 [main] Acquired lock in changeData() at 1197902630929 14:43:51,486 [main] Released lock in changeData() at 1197902631485 14:43:51,567 [thread-00] Acquired lock in changeData() at 1197902631566 14:43:51,869 [thread-00] Released lock in changeData() at 1197902631869 14:43:51,872 [thread-01] Acquired lock in readData() at 1197902631871 14:43:51,876 [thread-03] Acquired lock in readData() at 1197902631875 14:43:52,070 [thread-03] Released lock in readData() at 1197902632069 14:43:52,636 [thread-01] Released lock in readData() at 1197902632633 14:43:52,640 [thread-04] Acquired lock in changeData() at 1197902632637 14:43:53,626 [thread-04] Released lock in changeData() at 1197902633621 14:43:53,628 [thread-06] Acquired lock in readData() at 1197902633627 14:43:53,631 [thread-08] Acquired lock in readData() at 1197902633630 14:43:53,749 [thread-06] Released lock in readData() at 1197902633749 14:43:54,322 [thread-08] Released lock in readData() at 1197902634321 14:43:54,323 [thread-07] Acquired lock in changeData() at 1197902634323 14:43:55,310 [thread-07] Released lock in changeData() at 1197902635305 14:43:55,312 [thread-09] Acquired lock in readData() at 1197902635311 14:43:55,315 [thread-02] Acquired lock in readData() at 1197902635314 14:43:55,493 [thread-09] Released lock in readData() at 1197902635493 14:43:55,514 [thread-02] Released lock in readData() at 1197902635509 14:43:55,516 [thread-10] Acquired lock in changeData() at 1197902635516 14:43:55,554 [thread-10] Released lock in changeData() at 1197902635553 14:43:55,556 [thread-05] Acquired lock in readData() at 1197902635555 14:43:55,561 [thread-12] Acquired lock in readData() at 1197902635559 14:43:55,918 [thread-05] Released lock in readData() at 1197902635917 14:43:56,546 [thread-12] Released lock in readData() at 1197902636545 14:43:56,547 [thread-13] Acquired lock in changeData() at 1197902636547 14:43:57,118 [thread-13] Released lock in changeData() at 1197902637117 14:43:57,119 [thread-14] Acquired lock in readData() at 1197902637119 14:43:57,122 [thread-11] Acquired lock in readData() at 1197902637121 14:43:57,124 [thread-15] Acquired lock in readData() at 1197902637123 14:43:57,394 [thread-14] Released lock in readData() at 1197902637393 14:43:57,534 [thread-15] Released lock in readData() at 1197902637533 14:43:58,010 [thread-11] Released lock in readData() at 1197902638009
Lines 1 and 2 show the main application thread scrambling the data from its intial ordered state. Of course there's a one in six-ish chance the shuffle could leave it unchanged, but let's live dangerously.
The important thing to note is that in places like lines 9-10 and 21-22 the write thread enters and exits in an orderly fashion with no competition.
In contrast, in place like lines 5-8, concurrent threads co-exist happily when only reading is being done. 'thread-01' gets a read lock in line 5, followed by 'thread-03' before thread-01 has released its lock.

No comments: