Tuesday, April 19, 2011

Learning from JDK Source and Language Specs


Excerpts from Java Language Framework, Concurrency API Documents, following Websites and personal learning :

JSR 133 (Java Memory Model) FAQ :
 Concurrency Simplified :
 
1. Recollecting of Basic Concepts :

“Each thread is a different stream of control that can execute its instructions independently, allowing a
multithreaded process to perform numerous tasks concurrently. One thread can run the GUI while
a second thread does some I/O and a third performs calculations.
It is the program state that gets scheduled on a CPU; it is the "thing" that does the work. If a process comprises data, code, kernel state, and a set of CPU registers, then a thread is embodied in the contents of those registers—the program counter, the general registers, the stack pointer, etc., and the stack. A thread, viewed at an instant of time, is the state of the computation.”
 
Multithreaded programming with Java technology , Bil Lewis, Daniel J. Berg
 
A thread is a lightweight entity, comprising the registers, stack, and some other data. The rest of the process structure is shared by all threads: the address space, file descriptors, etc. Much (and sometimes all) of the thread structure is in user space, allowing for very fast access.
 
The main benefits of writing multithreaded programs are:
• Performance gains from multiprocessing hardware (parallelism)
• Increased application throughput
• Increased application responsiveness
• Replacing process-to-process communications
• Efficient use of system resources
• One binary that runs well on both uniprocessors and multiprocessors
 
 
2. Properties of typical MultiThreaded Program:
Independent tasks
A debugger needs to run and monitor a program, keep its GUI active, and display an interactive data inspector, dynamic call grapher, and performance monitor—all in the same address space, all at the same time.

For example, a  server needs to handle numerous overlapping requests simultaneously. NFS®, NIS, DBMSs,  stock quotation servers, etc., all receive large numbers of requests that require the server to do some I/O, then process the results and return answers. Completing one request at a time would be very slow.

Repetitive tasks
A simulator needs to simulate the interactions of numerous different elements that operate simultaneously. CAD, structural analysis, weather prediction, etc., all model tiny pieces first, then combine the results to produce an overall picture.
--------------------------------------------------------
 
3.    How does the Synchronization work in Memory ?

“Synchronization has several aspects. The most well-understood is mutual exclusion -- only one thread can hold a monitor at once, so synchronizing on a monitor means that once one thread enters a synchronized block protected by a monitor, no other thread can enter a block protected by that monitor until the first thread exits the synchronized block.
But there is more to synchronization than mutual exclusion. Synchronization ensures that memory writes by a thread before or during a synchronized block are made visible in a predictable manner to other threads which synchronize on the same monitor. After we exit a synchronized block, we release the monitor, which has the effect of flushing the cache to main memory, so that writes made by this thread can be visible to other threads. Before we can enter a synchronized block, we acquire the monitor, which has the effect of invalidating the local processor cache so that variables will be reloaded from main memory. We will then be able to see all of the writes made visible by the previous release.”
 
 
4. How Volatile can be used to avoid excessive synchronization
    and implement non-blocking operation ?
 
Volatile fields are special fields which are used for communicating state between threads.
 
The compiler and runtime are prohibited from allocating them in registers. They must also ensure that after they are written, they are flushed out of the cache to main memory, so they can immediately become visible to other threads. (executes a WRITE BARRIER like memory writes for UNLOCKING MONITOR).
 
Similarly, before a volatile field is read, the cache must be invalidated so that the value in main memory, not the local processor cache, is the one seen. Even if the volatile field is assigned any value in the reader thread, that value will be replaced by the latest value written by writer thread i.e. it will excute a READ_BARRIER to refresh local value with the ones in main memory.
 
This is how, each read of a volatile will see the last write to that volatile by any thread; in effect, they are designated by the programmer as fields for which it is never acceptable to see a "stale" value as a result of caching or reordering.
 
Each read or write of a volatile field acts like "half" a synchronization, for purposes of visibility.
 
The best example is ConcurrentHashMap which uses volatile to decide whether to acquire a partial lock on segment of keys or go ahead with direct lookup !!
 
Now lets look into the usage of Java Synchronizers : Modern non-blocking lock-contention free wait-optimized multi-threading tools
 
5. How to signal effectively between Consumer and Worker instead of wasting CPU Cycles ?
Condition – can be used to signal between Consumer and Worker threads.
    Instead of making consumer thread wait() in while() loop.
    This is specially useful for BlockingQueue.
    final BlockingQueue msgQ = new LinkedBlockingQueue();
   
    public void produceWork() throws InterruptedException {
       String message =  get the message from source ….
       msgQ.put(message);
    }
 
   public LoggedService() { // start background thread
   Runnable logr = new Runnable() {
    public void run() {
    try {
      for(;;)
      System.out.println(msqQ.take());
    } catch(InterruptedException ie) {} }};
   Executors.newSingleThreadExecutor().execute(logr);
  }
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
 
  msgQ.take() - will wait till an element is available in Queue.
 
   while (count == 0)
        notEmpty.await();
 
msgQ.poll(time..) - wait till an element is available after a specified time.
 
msgQ.put(E) – will wait till queue length ls less than capacity.
   while (count == items.length)
        notFull.await();

**** Lock.lock() replacement for synchronized {…} ; Condition.await()-signal()-signalAll()... replacement for wait()-notify()-nitofyAll() ...

This is a wonderful permission system – where N threads can hold a lock at any point of time !!
 
6  LOCK – offers significant advantage over 'synchronizing a block of code' i.e. does not force blocking a   'structured locking/unlocking'

Lets go through the Code Comments in LOCK.java
  
* {@code Lock} implementations provide more extensive locking
* operations than can be obtained using {@code synchronized} methods
* and statements.  They allow more flexible structuring, may have
* quite different properties, and may support multiple associated
* {@link Condition} objects.
 
Main problem with 'synchronized' methods or statements provides
* access to the implicit monitor lock associated with every object, but
* forces all lock acquisition and release to occur in a block-structured way
– Bigger Scope
 
Lock has the advantage of smaller scope -
* acquire the lock of node A, then node B, then release A and acquire
* C, then release B and acquire D and so on. (Hand Over Hand / Chain Lock) Implementations of the
* {@code Lock} interface enable the use of such techniques by
* allowing a lock to be acquired and released in different scopes
 
Another problem with 'synchronized' –
when multiple locks are acquired they must be released in the opposite
* order, and all locks must be released in the same lexical scope in which
* they were acquired.
 
Advantage of 'Lock' can be- released in any order.
 
Problem with Lock             
- With this increased flexibility comes additional responsibility.

* absence of block-structured locking removes the
* automatic release of locks that occurs with {@code synchronized}
* methods and statements. In most cases, the following idiom
* should be used:
*
*
    
Lock l = ...; 
*     l.lock();
*     try {
*         // access the resource protected by this lock
*     } finally {
*         l.unlock();
*     }
 
**** So now JVM will spend less time in SCHEDULING THREADS and more time in EXECUTING them!
**** Lock Contentions can be profiled in a much better way to spot bottlenecks !!
How to ensure Non-Blocking Behaviour
an attempt to acquire the lock that can be
·        interrupted ({@link #lockInterruptibly},
·        and an attempt to acquire the lock that can timeout ({@link #tryLock(long, TimeUnit)}).
How to ensure Concurrent Access using Locks ?
some locks may allow concurrent access to
* a shared resource, such as the read lock of a {@link ReadWriteLock}.             
 
Use ReentrantReadWriteLock  for enforcing multiple-reader, single-writer access.
              Write lock can “downgrade” to read lock (not vice-versa).
             
7. How to signal multiple threads simultaneously ?
 
CountDownLatch is - a synchronization aid that allows one or more threads to wait until  a set of operations being performed in other threads completes.
Example from CounDownLatch.java
 
*   class Driver { // ...
*     void main() throws InterruptedException {
*     CountDownLatch startSignal = new CountDownLatch(1);
*     CountDownLatch doneSignal = new CountDownLatch(N);
*
*     for (int i = 0; i < N; ++i) // create and start threads
*       new Thread(new Worker(startSignal, doneSignal)).start();
*
*     doSomethingElse();            // don't let run yet
*     startSignal.countDown();      // let all threads proceed
*     doSomethingElse();
*     doneSignal.await();           // wait for all to finish
*   }
* }
**** This also exemplifies – how instead of polling and wasting cpu resources, the thread  just receives a signal when it is good to proceed !!
**** No need to call join multiple times !!
* class Worker implements Runnable {
*   private final CountDownLatch startSignal;
*   private final CountDownLatch doneSignal;
*   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
*      this.startSignal = startSignal;
*      this.doneSignal = doneSignal;
*   }
*   public void run() {
*      try {
*        startSignal.await(); – this thread can't proceed till Driver calls
                                                                         countDown ..
*        doWork();
*        doneSignal.countDown(); -
*      } catch (InterruptedException ex) {} // return;
*   }
*
*   void doWork() { ... }
* }
*
 
  Another typical usage would be to divide a problem into N parts,
* describe each part with a Runnable that executes that portion and
* counts down on the latch, and queue all the Runnables to an
* Executor.  When all sub-parts are complete, the coordinating thread
* will be able to pass through await.
 
8. How to implement non-blocking  Optimistic Data Structure ?
class OptimisticLinkedList { // incomplete
static class Node {
volatile Object item;
final AtomicReference next;
Node(Object x, Node n) {
item = x; next = new AtomicReference(n); }
}
final AtomicReference head = new AtomicReference(null);
public void prepend(Object x) {
if (x == null) throw new IllegalArgumentException();
for(;;) {
Node h = head.get();
if (head.compareAndSet(h, new Node(x, h)) return;
}
}
public boolean search(Object x) {
Node p = head.get();
while (p != null && x != null && !p.item.equals(x))
p = p.next.get();
return p != null && x != null;
}
}
 
9.    How final fields can offer thread-safety :
All threads will read the final value so long as it is guaranteed to be assigned before the object could be made visible to other threads.
  
10.             Perform operations asynchronously using Futures
when time-consuming independent tasks need to be performed in Main thread.
class ImageRenderer { Image render(byte[] raw); }
class App { // ...
Executor executor = ...; // any executor
DocumentReader docReader = new DocumentReader();
public void display(final byte[] document) {
try {
Future<Document> document= Executors.invoke(executor,
new Callable<Document>(){
public Document call() {
return renderer.render(document);
}});
preparePanel(); // do other things in main thread
preparePageCaptions(); // ... while fetching the actual document in a different thread
showDocument(document.get()); // block till document is fetched by future
}
catch (Exception ex) {
cleanup();
return;
}
} }
 
 
 
11.   How static fields can be used to guarantee thread-safety :
 
use the Initialization On Demand Holder idiom, which is thread-safe and a lot easier to understand:
private static class LazyModelHolder {
  public static Model model = new Model();
}
 
public static Model getInstance() {
  return LazyModelHolder.something;
}
This code is guaranteed to be correct because of the initialization guarantees for static fields; if a field is set in a static initializer, it is guaranteed to be made visible, correctly, to any thread that accesses that class.
 
12. If not sure of fine-grained theading tools,  take resort to traditional synchronization
LinkedList queue = new LinkedList();
// Add to end of queue queue.add(object);
// Get head of queue Object o = queue.removeFirst();
// If the queue is to be used by multiple threads,
// the queue must be wrapped with code to synchronize the methods queue = (LinkedList)Collections.synchronizedList(queue)
13.           Why Deadlock occurs ? How to avoid it ?
It occurs when multiple threads each acquire multiple locks in different orders.
I lock a, b - Thread A:transferMoney(me, you, 100) U lock b, a - Thread B:transferMoney(you, me, 100).
 Synchronized – goes against the OOP principle !  Real tension between object oriented design and lock-based concurrency control !
The solution is – to ensure lock ordering as mentioned in the sections for creating Reentrant LOCKs.  Lock the smallest possible set of sequential steps :
atomic {        from.credit(amount)        to.debit(amount)  }
14. prefer immutability : prohibit sharing and avoid unnecessary synchonization
 CopyOnWriteArrayList, CopyOnWriteArraySet provides you thread safety with the added benefit of immutability to deal with data that changes infrequently.
The CopyOnWriteArrayList behaves much like the ArrayList class, except that when the list is modified, instead of modifying the underlying array, a new array is created and the old array is discarded.
This means that when a caller gets an iterator i.e. copyOnWriteArrayListRef.iterator(, which internally holds a reference to the underlying CopyOnWriteArrayList object’s array, which is immutable and therefore can be used for traversal without requiring either synchronization on the list copyOnWriteArrayListRef or need to clone() the copyOnWriteArrayListRef list before traversal (i.e. there is no risk of concurrent modification).
15.   Replacing synchronized collections with concurrent collections can offer dramatic scalability improvements
·        We have just seen  CopyOnWriteArrayList is a replacement for synchronized List implementations for cases where traversal is the dominant operation.
·        ConcurrentMap interface adds support for common compound actions such as put-if-absent, replace, and conditional remove.
·        ConcurrentLinkedQueue, a traditional FIFO queue, and PriorityQueue, a (non concurrent) priority ordered queue. Queue operations do not block; if the queue is empty, the retrieval operation returns null. While you can simulate the behavior of a Queue with a List in fact, LinkedList also implements Queue.
·        If we use a bounded blocking queue, then when the queue fills up the producers block, giving the consumers time to catch up because a blocked producer cannot generate more work
·        Blocking queues also provide an offer method, which returns a failure status if the item cannot be enqueued. This enables you to create more flexible policies for dealing with overload, such as shedding load, serializing excess work items and writing them to disk, reducing the number of producer threads.
·        LinkedBlockingQueue and ArrayBlockingQueue are FIFO queues, analogous to LinkedList and ArrayList but with better concurrent performance than a synchronized List.
·        PriorityBlockingQueue is a priority-ordered queue, which is useful when you want to process elements in an order other than FIFO. Just like other sorted collections, PriorityBlockingQueue can compare elements according to their natural order (if they implement Comparable) or using a Comparator.
·        ConcurrentHashMap implements a scalable locking strategy.
Instead of synchronizing every method on a common lock, restricting access to a single thread at a time, it uses a finer-grained locking mechanism called lock striping to allow a greater degree of shared access. Arbitrarily many reading threads can access the map concurrently, readers can access the map concurrently with writers, and a limited number of writers can modify the map concurrently.
It also provides iterators that do not throw ConcurrentModificationException, thus eliminating the need to lock the collection during iteration. The iterators returned by ConcurrentHashMap are weakly consistent instead of fail-fast.
·        ConcurrentSkipListMap and ConcurrentSkipListSet are concurrent replacements for a synchronized SortedMap or SortedSet (such as treeMap or TreeSet wrapped with synchronizedMap).
16. Document ThreadSafety
@ThreadSafe public class Account {
  @GuardedBy("this") private int balance;
.....
}
17.  Abide by the practice of deep cloning for ensuring thread-safe immutability
Inside immutable Implementation class / Array , always deep clone inner collections.
Jdk 7 comes handy with Array deepEquals
18 . Isolate concurrency in concurrent components such as blocking queues
19. Task Completion Notifications
>> Use ExecutorCompletionService and customized BlockingQueue. ECS will place the completed tasks in queue so that one can poll with timeout.
>> Otherwise executorService.invokeAny(allCallables, timeOut) – is very handy for a quick survey of completion status !
References :
Merge Sort using Concurrency :
A popular framework
A great comparison and discussion
Quoting the concise list of concurrency choices from the above link :
“  Executors (java.util.concurrent) – put your computations in Runnables or Callables and submit them to an ExecutorService that is backed by a thread pool. Express dependencies by using Futures or other techniques. Two problems here – first, executors generally assume that there is 1 queue and many threads which introduces the queue contention problem mentioned above. Also, it has no way for the work to be scheduled with knowledge of dependencies. It is possible to build that over the top of course, but it’s a lot of work orthogonal to your problem at hand. The queue+threads model can’t scale as is.
·      Fork/join – create your computations as RecursiveTasks or RecursiveActions and submit them to a ForkJoinPool that is backed by a thread pool. Express dependencies directly using the RecursiveTask apis for fork, join, invoke, etc. Fork/join addresses both of the concerns I mention above. Instead of a single work queue, there is one work queue per thread. This means that at the head of the queue there is no contention – there is only 1 thread reading from it. Fork/join also addresses the dependency concern because it knows that one task is waiting for another to complete – the work/stealing algorithm inherently leverages these dependencies. I would urge you to watch Doug Lea’s talk from the JVM Language Summit 2010.
·      Actors – express your computations as the run loop of an actor. Communicate between actors with messages – this makes the dependencies asynchronous AND somewhat invisible to whomever is scheduling actor invocations except by the arrival of messages in a mailbox. Every actor has a mailbox which effectively means there is one queue per-actor, which lets you decide in your problem how finely to cut it up. Something has to actually schedule the computations of the actors though – note that Scala actors are backed by … a fork/join pool. It seems to me that the actor model obscures the dependency information from fork/join – there is nothing captured at the underlying level when an actor has sent a message to another and is waiting for a response. That’s implicitly captured by having a message in one mailbox and nothing in the waiting actor, but it seems impossible to convey the higher-level dependency structure to the underlying scheduler.
·      Continuations – to me, continuations are (at a high level) pretty similar to actors. They have the benefit that can presumably be paused from outside the computation, so an external scheduler might be able to timeslice work in and out in some better way, but it seems like there is a lot of machinery there that adds overhead.
·      Data flow – data flow is a very intriguing model because it lets you explicitly model the data dependencies between tasks. GPars probably has the most interesting implementation of it that I know of. There are a few other variants written for Clojure (that relies on the underlying agent framework) and Scala (that relies on the underlying actor infrastructure). Because those rely on underlying frameworks, I’m pretty sure they don’t optimally leverage the dependency information inherent in data flow tasks. I’d love to see a framework that was optimized to leverage that dependency info though.
·      Clojure – Clojure actually has a bunch of different things that work in concert so it’s hard for me to describe it as any one model. Most state is immutable and persistent via structural sharing. When you want to mutate state, there are a variety of features (refs, atoms, agents) that let you choose whether state changes should happen synchronously or asynchronously, and whether they should or should not be coordinated with others. Clojure has STM which allows you to synchronize multiple state changes in a well-ordered way. MVCC lets you see a consistent view during the change (again leveraging the persistent data structures) and transactions are retried in the case of conflict. Reads are always available, again due to the data structures. Clojure agents are backed ultimately by an executor pool (one that is internal and you have no control over). There is work ongoing in Clojure to create a set of functions over sequences (filter, map, etc) that is backed by parallel execution against a fork/join pool and I think that shows great promise to provide easy benefits for a different kind of problem (where you are working with large chunks of data). … “

No comments: