IT_Programming/Java

[펌] Java 병렬 프로그래밍 Summary

JJun ™ 2013. 9. 14. 03:19


 출처:  http://ismydream.tistory.com/category/프로그래밍/병렬프로그래밍


 

 책 내용들의 일부지만 나름 잘 정리되어 있는 자료다.

 시간이 날 때 내용 좀 보충하면...ㅎ (과연...;;)

 


 

 

 

 

5. 구성 단위

 

5.1 동기화된 컬렉션 클래스

Vector, Hashtable

Collections.synchronizedXxx 메소드를 사용하여 동기화된 클래스를 만들어 사용할 수 있었다.

 

public static Object getLast(Vector list){

           synchronized(list){

                     int lastIndex = list.size() - 1;

                     return list.get(lastIndex);

           }

}

 

public static void deleteLast(Vector list){

           synchronized(list){

                     int lastIndex = list.size() - 1;

                     list.remove(lastIndex);

           }

}

 

synchronized(vector){

           for(int i=0; i<vector.size(); i++)

                     doSomething(vector.get(i));

}

 

5.1.2 Iterator ConcurrentModificationException

List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>());

 

// ConcurrentModificationException 이 발생할 수 있다.

for(Widget w : widgetList)

           doSomething(w);

          

for-each 구문은 compile  Iterator 를 사용한 구문으로 변경된다.

for-each 구문이 동기화 되어 있지 않을 경우에는 for-each 구문 실행시 다른 스레드에 의해 추가 삭제가 일어날 수 있다.

 

반복문을 실행하는 동안 컬렉션 클래스에 들어 있는 내용에 락을 걸어둔 것과 비슷한 효과를 내려면 clone 메소드로

복사본을 만들어 복사본을 대상으로 반복문을 사용할 수 있다. 최소한 clone 메소드를 실행하는 동안에는 컬렉션의 내용을

변경할 수 없도록 동기화시켜야 한다.

 

5.1.3 숨겨진 Iterator

public class HiddenIterator{

           @GuardedBy("this")

           private final Set<Integer> set = new HashSet<Integer>();

          

           public synchronized void add(Integer i){ set.add(i); }

           public synchronized void remove(Integer i){ set.remove(i); }

          

           public void addTenThings(){

                     Random r = new Random();

                     for( int i=0; i < 10; i++)

                                add(r.nextInt());

                     System.out.println( "DEBUG: added ten elements to " + set);

           }

}

set 변수는 사용시에 동기화되어 있어야 한다.

System.out.println 에서 set 변수는 내부적으로 toString 메소드를 호출하게된다.

toString 메소드내에서는 Iterator 를 사용하여 Set 내의 Element 에 접근한다.

따라서 해당 부분은 동기화 되어 있지 않다.

toString 뿐만 아니라 hashCode, equals 메소드도 내부적으로 iterator 를 사용한다. 뿐만 아니라

containsAll, removeAll, retainAll 의 메소드도 내부적으로 iterator 를 사용한다.

 

5.2 병렬 컬렉션

자바 5.0 에서는 HashMap 을 대체할 수 있는 ConcurrentHashMap 을 제공한다.

CopyOnWriteArrayList  Element 의 열람 성능을 최우선으로 구현한 List 객체이다.

ConcurrentMap 인터페이스에는 put-if-absent, replace, conditional-remove 연산이 추가됬다.

 

기종의 동기화 컬렉션을 병렬 컬렉션으로 교체하는 것만으로 별다른 위험 요소 없이 전체적인 성능을 상당히 끌어 올릴 수 있다.

 

Queue 인터페이스(자바 5.0)

ConcurrentLinkedQueue, PriorityQueue

BlockingQueue

 

자바 6.0 에는 ConcurrentSkipListMap, ConcurrentSkipListSet 을 제공한다.

SortedMap, SortedSet 클래스의 병렬성을 높이도록 발전된 형태이다.

 

5.2.1 ConcurrentHashMap

ConcurrentHashMap  lock striping 동기화 방법을 사용한다.

ConcurrentHashMap  Iterator  ConcurrentModificationException 을 발생하지 않는다.

따라서 반복문 실행시 동기화할 필요가 없다.

Iterator 를 만들었던 시점의 상황대로 반복을 처리한다.

size, isEmpty 의 결과가 정확하지 않다.

 

5.2.2 Map 기반의 또 다른 단일 연산

 

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayList 클래스는 동기화된 List 클래스보다 병렬성을 훨씬 높이고자 만들어졌다.

CopyOnWriteArrayList 는 컬렉션의 내용이 변경될 때마다 복사본을 새로 만들어 내는 전략을 취한다.

Iterator 사용시 변경되는 항목은 복사본에 반영된다.

 

컬렉션의 데이터가 변경될 때마다 복사본을 만들어내는 방법은 컬렉션에 많은 양의 자료가 들어 있다면 성능상의 손실이 크다.

따라서 변경 작업보다는 반복문으로 읽어내는 일이 훨씬 빈번한 경우에 효과적이다.

 

public interface ConcurrentMap<K,V> extends Map<K,V>{

           V putIfAbsent(K Key, V value);

          

           boolean remove(K Key, V value);

          

           boolean replace(K Key, V oldValue, V newValue);

          

           V replace(K Key, V newValue);

}

 

5.3 블로킹 큐와 프로듀서-컨슈머 패턴

BlockingQueue  put, take 라는 핵심 메소드를 가지고 있다.

put 메소드 실행시 큐가 가득차 있을 경우에는 필요한 공간이 생길때까지 대기한다.

take 메소드 실행시 큐가 비었을 경우에는 값이 들어올때까지 대기한다.

offer 메소드는 큐에 값을 넣으려고 할때 공간이 없으면 추가할 수 없다는 오류를 알려준다.

 

블로킹 큐는 애플리케이션이 안정적으로 동작하도록 만들고자 할 때 요긴하게 사용할 수 있는 도구이다.

블로킹 큐를 사용하면 처리할 수 있는 양보다 훨씬 많은 작업이 생겨 부하가 걸리는 상황에서 작업량을 조절해

애플리케이션이 안정적으로 동작하도록 유도할 수 있다.

 

BlockingQueue 의 구현체

LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue

 

5.3.1 예제 : 데스크탑 검색

 

public class FileClawler implements Runnable{

           private final BlockingQueue<File> fileQueue;

           private final FileFilter fileFilter;

           private final File root;

          

           public void run(){

                     try{

                                crawl(root);

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

          

           private void crawl(File root) throws InterruptedException{

                     File[] entries = root.listFiles(fileFileter);

                     if( entries != null){

                                for( File entry : entries)

                                          if(entry.isDirectory())

                                                     crawl(entry);

                                          else if( !alreadyIndexed(entry))

                                                     fileQueue.put(entry);

                     }

           }

}

 

public class Indexer implements Runnable{

           private final BlockingQueue<File> queue;

          

           public Indexer(BlockingQueue<File> queue){

                     this.queue = queue;

           }

          

           public void run(){

                     try{

                                while(true)

                                          indexFile(queue.take());

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

}

 

public static void startIndexing(File[] roots){

           BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);

           FileFilter filter = new FileFilter(){

                     public boolean accept(File file){ return true; }

           };

          

           for( File root : roots)

                     new Thread(new FileCrawler(queue, filter, root)).start();

          

           for( int i=0; i<N_CONSUMERS; i++)

                     new Thread(new Indexer(queue)).start();

}

 

5.3.3 , 작업 가로채기

자바 6.0에서는 Deque, BlockingDeque 가 추가됐다. 둘은 Queue  BlockingQueue 를 상속받은 인터페이스이다.

작업 가로채기 패턴에서는 모든 컨슈머가 각자의 덱을 갖는다. 만약 특정 컨슈머가 자신의 덱에 들어 있던 작업을 모두 처리하고 나면

다른 컨슈머의 덱에 쌓여있는 작업 가운데 맨 뒤에 추가된 작업을 가로채 가져올 수 있다.

작업 가로채기 패턴은 컨슈머가 프로듀서의 역할도 갖고 있는 경우에 적용하기 좋다.

 

5.4 블로킹 메소드, 인터럽터블 메소드

스레드가 블락되면 다음 블록된 상태(BLOCKED, WAITING, TIMED_WAITING)가운데 하나를 갖게 된다.

블락된 스레드는 특정 신호를 받게 되면 RUNNABLE 상태가 되어 CPU를 사용할 수 있게 된다.

BlockingQueue  put, take 메소드는 블락킹 메소드이다.

 

InterruptedException 에 대한 대처 방안

- InterruptedException 을 전달 : 받아낸 InterruptedException을 그대로 호출한 메소드에게 넘겨버리는 방법

- 인터럽트를 무시하고 복구 :

InterruptedExceptino throw 할 수 없는 경우에는(Runnable 인터페이스를 구현한 경우) InterruptedException catch 한 다음

현재 스레드의 interrupt 메소드를 호출해 인터럽트 상태를 설정해 상위 호출 메소드가 인터럽트 상황이 발생했음을 알 수 있도록 해야 한다.

 

public class TaskRunnale implemetns Runnable {

           public void run(){

                     try{

                                processTask(queue.take());

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

}

 

5.5 동기화 클래스

상태 정보를 사용해 스레드 간의 작업 흐름을 조절할 수 있도록 만들어진 모든 클래스를 동기화 클래스라고 한다.

- BlockingQueue, 세마포어 semaphore, 배리어 barrier, 래치 latch

 

5.5.1 래치

래치는 스스로가 터미널 terminal상태 에 이를 떄까지 스레드가 동작하는 과정을 늦출 수 있도록 해주는 동기화 클래스이다.

래치는 일종의 관문과 같다. terminal 상태에 이르면 관문이 열리고 그 이전 상태로 되돌릴 수 없다.

- 특정 자원을 확보하기 전에는 작업을 시작하지 말아야 하는 경우에 사용할 수 있다.

- 의존성을 갖고 있는 다른 서비스가 시작하기 전에는 특정 서비스가 실행되지 않도록 막아야 하는 경우에 사용할 수 있다.

- 특정 작업에 필요한 모든 객체가 실행할 준비를 갖출 때까지 기다리는 경우에도 사용할 수 있다.

CountDownLatch

하나 또는 둘 이상의 스레드가 여러 개의 이벤트가 일어날 때까지 대기할 수 있도록 되어 있다.

래치의 상태를 나타내는 숫자는 대기하는 동안 발생해야 하는 이벤트의 건수를 의미한다.

public class TestHarness {

           public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{

                     final CountDownLatch startGate = new CountDownLatch(1);

                     final CountDownLatch endGate = new CountDownLatch(nThreads);

                    

                     for( int i=0; i<nThreads; i++){

                                Thread t = new Thread(){

                                          public void run(){

                                                     try{

                                                                startGate.await();

                                                                try{

                                                                          task.run();

                                                                }finally{

                                                                          endGate.countDown();

                                                                }

                                                     }catch(InterruptedException ignored){}

                                          }

                                };

                                t.start();

                     }

                    

                     long start = System.nanoTime();

                     startGate.countDown();

                     endGate.await();

                     long end = System.nanoTime();

                     return end-start;

           }

}

CountDownLatch 를 사용하여 n 개의 스레드가 동시에 동작할때 전체 실행 시간을 확인할 수 있다.

 

5.5.2 FutureTask

FutureTask 는 래치와 비슷한 형태로 동작한다.

Future 인터페이스를 구현하며 FutureTask 가 나타내는 연산적업은 Callable 인터페이스를 구현한다.

FutureTask 는 실제로 연산을 실행했던 스레드에서 만들어 낸 결과 객체를 실행시킨 스레드에게 넘겨준다.

public class Preloader{

           private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>(){

                     public ProductInfo call() throws DataLoadException {

                                return loadProductInfo();

                     }

           });

          

           private final Thread thread = new Thread(future);

          

           public void start() { thread.start(); }

          

           public ProductInfo get() throws DataLoadException, InterruptedException {

                     try{

                                return future.get();

                     }catch(ExecutionException e){

                                Throwable cause = e.getCause();

                                if( cause instanceof DataLoadException)

                                          throw (DataLoadException) cause;

                                else

                                          throw launderThrowable(cause);

                     }

           }

}

Future.get 메소드에서 ExecutionException 이 발생하는 경우

- Callable 이 던지는 예외

- RuntimeException

- Error

 

5.5.3 세마포어 semaphore

카운팅 세마포어는 특정 자원이나 특정 연산을 동시에 사용하거나 호출할 수 있는 스레드의 수를 제한하고자 할 때 사용한다.

세마포어는 가상의 퍼밋을 만들어 내부 상태를 관리하며 acquire 메소드를 사용해 파밋을 획득할 수 있으며 release 메소드로 퍼밋을 반납한다. acquire 메소드로 취득할 퍼밋이 없을 경우에는 여유 퍼밋이 생길때까지 대기한다.

이진 세마포어는 퍼밋 값이 1로 지정된 세마포어이며 락의 역할을 하는 뮤텍스로 활용할 수 있다.

 

public class BoundHashSet<T>{

           private final Set<T> set;

           private final Semaphore sem;

          

           public BoundedHashSet(int bound){

                     this.set = Collections.synchronizedSet( new HashSet<T>);

                     sem = new Semaphore(bound);

           }

          

           public boolean add(T o) throws InterruptedException{

                     sem.acquire();

                     boolean wasAdded = false;

                     try{

                                wasAdded = set.add(o);

                                return wasAdded;

                     }finally{

                                if(!wasAdded)

                                          sem.release();

                     }

           }

          

           public boolean remove(Object o){

                     boolean wasRemoved = set.remove(o);

                     if(wasRemoved)

                                sem.release();

                     return wasRemoved;

           }

}

[세마포어를 사용해서 컬렉션의 크기 제한하기]

 

5.5.4 배리어 barrier

배리어는 특정 이벤트가 발생할 때까지 여러 개의 스레드를 대기 상태로 잡아둘 수 있다는 측면에서 래치와 비슷하다. 래치는 이벤트를 기다리기 위한 동기화 클래스이고, 배리어는 다른 스레드를 기다리기 위한 동기화 클래스이다.

배리어는 실제 작업은 여러 스레드에서 병렬로 처리하고, 다음 단계로 넘어가기 전에 스레드의 처리결과를 취합할 때 많이 사용된다.

 

barrier 포인트에 도달한 스레드는 await 메소드를 실행한다.

CyclicBarrier 는 배리어 작업을 정의할 수 있다. 배리어 작업은 배리어 통과후 스레드를 놓아주기 전에 실행된다.

public class CellularAutomata{

           private final Board mainBoard;

           private final CyclicBarrier barrier;

           private final Worker[] workers;

          

           public CellularAutomata(Board board){

                     this.mainBoard = board;

                     int count = Runtime.getRuntime().availableProcessors();

                     this.barrier = new CyclicBarrier(count,

                                new Runnable(){

                                          public void run(){

                                                     mainBoard.commitNewValues();

                                          }

                                });

                     this.workers = new Worker[count];

                     for(int i=0; i<count; i++)

                                workers[i] = new Worker(mainBoard.getSubBoard(count, i));

           }

          

           private class Worker implements Runnable{

                     private final Board board;

                    

                     public Worker(Board board){ this.board = board; }

                     public void run(){

                                while(!board.hasConverged()){

                                          for( int x=0; x<board.getMaxX(); x++)

                                                     for( int y=0; y<board.getMaxY(); y++)

                                                                board.setNewValue(x, y, computeValue(x,y));

                                          try{

                                                     barrier.await();

                                          }catch(InterruptedException ex){

                                                     return;

                                          }catch(BrokenBarrierException ex){

                                                     return;

                                          }

                                }

                     }

           }

          

           public void start(){

                     for(int i=0; i<workers.length; i++)

                                new Thread(workers[i]).start();

                     mainBoard.waitForConvergence();

           }

}

5.6 효율적이고 확장성 있는 결과 캐시 구현

캐시로 HashMap 대시 ConcurrentHashMap 을 사용할 경우 ConcurrentHashMap 은 스레드 안정성을 확보하기 때문에 별다른 동기화 방법을 사용하지 않아도 되며 병렬 프로그래밍에 대한 성능또한 개선된다.

 

public class Memoizer2<A, V> implements Computable<A, V>{

           private final Map<A, V> cache = new ConcurrentHashMap<A, V>();

           private final Computable<A, V> c;

          

           public Memoizer2(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     V result = cache.get(arg);

                     if( result == null){

                                result = c.compute(arg);

                                cache.put(arg, result);

                     }

                    

                     return result;

           }

}

Memoizer2  compute를 수행줄일 때 동일한 값에 대한 요청이 들어오면 또 compute 를 수행한다. 아래 Memoizer3  Future 를 사용해 동일한 계산이 수행될 가능성을 대폭적으로 줄였다.

 

public class Memoizer3<A, V> implements Computable<A, V>{

           private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

           private final Computable<A, V> c;

          

           public Memoizer3(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     Future<V> f = cache.get(arg);

                     if( f == null){

                                Callable<V> eval = new Callable<V>(){

                                          public V call() throws InterruptedException{

                                                     return c.compute(arg);

                                          }

                                };

                                FutureTask<V> ft = new FutureTask<V>(eval);

                                f = ft;

                                cache.put(arg,ft);

                                ft.run();

                     }

                     

                     try{

                                return f.get();

                     }catch(ExecutionException e){

                                throw launderThrowable(e.getCause());

                     }

           }

}

 

최종 버전 cache.put 대신 cache.putIfAbsent 메소드 사용

public class Memoizer<A, V> implements Computable<A, V>{

           private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

           private final Computable<A, V> c;

          

           public Memoizer(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     while(true){

                                Future<V> f = cache.get(arg);

                                if( f == null){

                                          Callable<V> eval = new Callable<V>(){

                                                     public V call() throws InterruptedException{

                                                                return c.compute(arg);

                                                     }

                                          };

                                          FutureTask<V> ft = new FutureTask<V>(eval);

                                          f = cache.putIfAbsent(arg,ft);

                                          if( f == null){f = ft; ft.run(); }

                                }

                               

                                try{

                                          return f.get();

                                }catch(CancellationException e){

                                          cache.remove(arg, f);

                                }

                                }catch(ExecutionException e){

                                          throw launderThrowable(e.getCause());

                                }

                     }

           }

          

 

 

}


 

2. 병렬프로그램 구조 잡기

6. 작업 실행

 

6.1 스레드에서 작업 실행

스레드는 특정 잡업을 비동기적으로 동작시킬 수 있는 방법을 제공한다.

작업의 병렬성을 보장받기 위해서는 해당 작업이 독립된 작업이어야 한다.

일반적으로 모든 서버는 클라이언트의 개별 요청 단위를 작업의 범위로 지정한다.

그럼으로써 작업의 독립성을 보장받으면서 작업의 크기를 적절하게 설정할 수 있다.

 

6.1.1 작업을 순차적으로 실행

하나의 스레드가 모든 요청을 순차적으로 처리하는 경우

class SingleThreadWebServer{

           public static void main(String[] args) throws IOException {

                     ServerSocket socket = new ServerSocket(80);

                     while(true){

                                Socket connection = socket.accept();

                                handleRequest(connection);

                     }

           }

}

 

단점 : 단일 스레드 프로그램에서는 CPU를 사용하지 않는 작업, 예를 들어 I/O, 데이터베이스 작업시에도 다른 요청들의 처리에

CPU 자원을 활용할 수 없다.

 

6.1.2 작업마다 스레드를 직접 생성

작업을 처리하는 스레드를 요청마다 하나씩 생성하게 되는 경우

class ThreadPerTaskWebServer{

           public static void main(String[] args) throws IOException{

                     ServerSocket socket = new ServerSocket(80);

                     while(true){

                                final Socket connection = socket.accept();

                                Runnable task = new Runnable(){

                                          public void run(){

                                                     handleRequest(connection);

                                          }

                                }

                               

                                new Thread(task).start();

                     }

           }

}

- 작업을 처리하는 기능이 메인 스레드에서 떨어져 나온다.

- 동시에 여러 작업을 병렬로 처리할 수 있기 때문에 두개 이상의 요청을 받아 동시에 처리할 수 있다.

- 작업에 대한 스레드 안전성을 확보해야 한다.

 

 

6.1.3 스레드를 많이 생성할 때의 문제점

- 스레드 라이프 사이클 문제

           스레드 생성, 제거작업에 자원이 소모된다.

           클라이언트의 요청이 간단하면서 자주 발생하는 유형일때, 새로운 스레드를 생성하는 일이 상대적으로 전체 작업에서 많은 부분을 차지할 수 있다.

- 자원 낭비

           스레드는 시스템의 자원, 특히 메모리를 소모한다. 프로세서보다 많은 수의 스레드가 만들어져 동작 중이라면 실제로는 대부분의 스레드가 대기 상태에 머무른다.

           JVM 가비지 콜렉터에 가해지는 부하가 늘어난다. CPU 사용을 위해 여러 스레드가 경쟁하게 된다.

- 안정성 문제

           스레드는 자바코드와 네이티브 코드를 실행할 수 있도록 두개의 스택을 갖는다. 일번적인 JVM 의 경우 두개의 스택을 더한 값이 0.5MB 정도 된다. 따라서 무분별하게 스레드를 생성할 경우 OutOfMemoryError 가 발생할 수 있다.

 

6.2 Executor 프레임웤

 

[Executor 인터페이스]

public interface Executor {

void execute( Runnable command);

}

 

- Executor 는 비동기적 작업 실행 프레임웍의 근간을 이루는 인터페이스이다.

- Executor 는 작업 등록 task submission 과 작업 실행 task execution 을 분리한다.

- 각 작업의 라이프 사이클을 관리하는 기능을 제공한다.

- 통계 값을 뽑아내거나 애플리케이션에서 작업 실행 과정을 관리하고 모니터링하기 위한 기능도 제공한다.

- 각 작업은 Runnable 형태로 정의한다.

- Executor의 구조는 프로듀서 - 컨슈머 패턴에 기반하고 있다.

 

6.2.1 예제 : Executor 를 사용한 웹서버

class TaskExecutionWebServer{

           private static final int NTHREADS = 100;

           private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

          

           public static void main(String[] args) throws IOException{

                     ServerSocket socket = new ServerSocket(80);

                     while(true){

                                final Socket connection = socket.accept();

                                Runnable task = new Runnable(){

                                          public void run(){

                                                     handleRequest(connection);

                                          }

                                }

                               

                                exec.execute(task);

                     }

           }

}

 

6.2.2 실행 정책
작업을 등록하는 부분과 실행하는 부분을 분리시키면 실행 정책 execution policy를 쉽게 변경할 수 있다.

- 작업을 어느 스레드에서 실행할 것인가?

- 작업을 어떤 순서로 실행할 것인가?(FIFO, LIFO)

- 동시에 몇 개의 작업을 병렬로 실행할 것인가?

- 최대 몇 개까지의 작업이 큐에서 실행을 대기할 수 있게 할 것인가?

- 시스템에 부하가 많이 걸려서 작업을 거절해야 하는 경우, 어떤 작업을 희생양으로 삼아야 할 것이며, 작업을 요청한 프로그램에 어떻게 알려야 할 것인가?

- 작업을 실행하기 직전이나 실행한 직후에 어떤 동작이 있어야 하는가?

 

6.3.3 스레드 풀

자바 클래스 라이브러리에서 미리 정의되어 잇는 스레드 풀

- newFixedThreadPool

           제한된 갯수까지 스레드를 생성한다.

- newCachedThreadPool

           필요한 만큼 스레드를 새로 생성한다. 사용하지 않는 스레드를 제거한다.

- newSingleThreadExecutor

           작업중에 Exception 이 발생해 비정상적으로 종료되면 새로운 스레드를 하나 생성해 나머지 작업을 실행한다.

           특정작업이 진행되는 동안 메모리에 남겨진 기록을 다음에 실행되는 작업에서 가져다 사용할 수 있다.

- newScheduledThreadPool

           일정시간 이후에 실행하거나 주기적으로 실행할 작업을 처리할 수 있다.

 

6.2.4 Executor 동작 주기

Executor 의 동작 주기를 관리하기 위해 ExecutorService 에는 여러가지 메소드가 추가되어 있다.

public interface ExecutorService extends Executor {

           void shutdown();

           List<Runnable> shutdownNow();

           boolean isShutdown();

           boolean isTerminated();

           boolean awaitTermination( long timeout, TimeUnit unit) throws InterruptedException;

           ...

}

 

Executor Service 가 갖고 있는 동작 주기 : 실행중(running), 종료중(shutting down), 종료(terminated)

 

6.2.5 지연 작업, 주기적 작업

Timer , ScheduledThreadPoolExecutor

Timer 클래스는 작업을 실행하는데 하나의 스레드만을 사용한다.

 

6.3 병렬로 처리할 만한 작업

 

6.3.1 예제 : 순차적 페이지 렌더링

 

6.3.2 결과가 나올때까지 대기 : Callable Future

Runnable 인터페이스의 한계

- 실행이 끝난 다음 결과 값을 리턴해 줄 수가 없다.

- 발생된 예외를 처리할 수가 없다.

Callable 인터페이스

- 실행후 결과값을 리턴 받을 수 있다.

- Exception 도 발생시킬 수 있다.

Future

- 작업이 정상적으로 완료됐는지, 아니면 취소됐는지 등에 대한 정보를 확인할 수 있도록 만들어진 클래스이다.

- ExecutorService 를 통해 Runnable, Callable  Future 로 변환할 수 있다.

 

public interface Callable<V>{

           V call() throws Exception;

}

 

public interface Future<V> {

           boolean cancel(boolean myInterrupIfRunning);

           boolean isCancelled();

           boolean isDone();

           V get() throws InterruptedException, ExecutionException, CancellationException;

           V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException;

}

 

Executor 에서는 Runnable, Callable, java.security.PriviledgedAction 등 여러가지 유형의 작업을 실행할 수 있다.

Executor 에서 처리하는 작업은 생성 created, 등록 submitted, 실행 started, 종료 completed 의 네가지 상태를 통과한다.

 

get 메소드는 작업이 진행되는 상태에 따라 다른 유형으로 동작한다.

작업 완료시 - 즉시 결과 값을 리턴하거나 Exception 을 발생(Exception  ExecutionException 클래스에 담아서 던진다.)

시작전, 진행중 - 작업이 완료될때까지 대기

 

Future 에서 리턴되는 값을 사용하기까지 순차적으로 작업을 처리하는 시간동안 Future 의 작업은 병렬로 처리된다.

순차작업이 빨리 끝나는 경우에는 Future 의 작업이 끝나기를 기다리게 되며 순차작업이 오래 걸리는 경우에는 Future 에서 처리된

결과를 바로 가져다 사용할 수 있다.

 

6.3.5 CompletionService : Executor  BlockingQueue 의 연합

CompletionService   Executor 의 기능과 BlockingQueue 의 기능을 하나로 모은 인터페이스이다.

 

private class QueueingFuture<V> extends FutureTask<V> {

           QueueingFuture(Callable<V> c){ super(c); }

           QueueingFuture(Runnable t, V r){ super(t, r); }

          

           protected void done(){

                     completionQueue.add(this);

           }

}

 

6.3.6 예제 : CompletionService 를 활용한 페이지 렌더링

 

6.3.7 작업 실행 시간 제한

작업 싫행 시간 제한을 두기 위해 고려해야 할 부분

- 제한된 작업시간이 지날 경우 알리는 방법

- 제한된 작업시간 경과시 해당 작업을 중단

 

Future 사용해서 구현

try{

           f.get( timeLeft, NANOSECONDS); //Futrue task 에 제한된 시간을 설정한다.

}

catch( TimeoutException e){

           f.cancel(true); // TimeoutException 이 발생했을 때, 즉 제한된 시간을 경과했을 때 작업을 중단시킨다.

}

 

 

7. 중단 및 종료

작업이나 스레드를 안전하고 빠르고 안정적으로 멈추게 하는 것은 어려운 일이다.

자바에는 스레드가 작업을 실행하고 있을 때 강제로 멈추도록 하는 방법이 없다.

interrupt 를 사용하여 특정 스레드에게 작업을 멈춰 달라고 요청할 수 있다.

 

7.1 작업 중단

- 사용자가 취소하기를 요청한 경우

- 시간이 제한된 작업

- 애플리케이션 이벤트

- 오류

- 종료

 

특정 작업을 임의로 종료시킬 수 있는 방법은 없으며 작업 취소 스레드를 사용하여 작업을 멈추는 협력적인 방법을 사용해야 한다.

@ThreadSafe

public class PrimeGenerator implemetns Runnable {

           @GuardedBy("this")

           private final List<BigInteger> primes = new ArrayList<BigInteger>();

           private volitile boolean cancelled;

          

           public void run(){

                     BigInteger p = BigInteger.ONE;

                     while(!cancelled){

                                p = p.nextProbablePrime();

                                synchronized(this){

                                          primes.add(p);

                                }

                     }

           }

          

           public void cancel() { cancelled = true; }

          

           public synchronized List<BigInteger> get(){

                     return new ArrayList<BigInteger>(primes);

           }

}

 

List<BigInteger> aSecondOfPrime() throws InterruptedException {

           PrimeGenerator generator = new PrimeGenerator();

           new Thread(generator).start();

           try{

                     SECONDS.sleep(1);

           }finally{

                     generator.cancel();

           }

           return generator.get();

}

 

 

 


7.1.1 
인터럽트

- stop, cancel 플래그를 사용하여 해당 스레드의 중단 여부를 판별하는 경우에는 작업중단 요청시 작업에 걸리는 시간만큼의 대기시간이 필요할 수도 있다.

- 모든 스레드는 불린 값으로 인터럽트 상태를 갖고 있다.

- 위 플래그를 사용하기 보다는 Thread.currentThread.isInterrupted() 메소드를 통해 현재 상태를 확인하는 것이 좋다.

- Thread.sleep 이나 Object.wait 로 대기시 interrupt가 걸리면 InterruptedException 을 발생한다.

 

interrupt : 해당 스레드에 interrupt 를 건다.

isInterrupted : 해당 스레드가 interrupt 상태인지를 확인한다.

Thread.interrupted : 현재 스레드의 인터럽트 상태를 해제하고, 해제하기 이전의 값이 무엇이었는지를 알려준다.

 

class PrimeProducer extends Thread{

           priate final BlockingQueue<BigInteger> queue;

          

           PrimeProducer(BlockingQueue<BigInteger> queue){

                     this.queue = queue;

           }

          

           public void run(){

                     try{

                                BigInteger p = BigInteger.ONE;

                                while(Thread.currentThread.isInterrupted())

                                          queue.put(p = p.nextProbablePrime());

                     }catch(InterruptedException consumed){

                               

                     }

           }

          

           public void cancel(){ interrupt(); }

}

 

 

 



7.1.2 
인터럽트 정책

- 인터럽트 처리 정책은 인터럽트 요청이 들어 왔을 때, 해당 스레드가 인터럽트를 어떻게 처리해야 하는지에 대한 지침이다.

- 가장 범용적인 인터럽트 정책은 스레드 수준이나 서비스 수준에서 작업 중단 기능을 제공하는 것이다.
- 작업 task 과 스레드 thread 가 인터럽트 상황에서 서로 어떻게 동작해야 하는지를 명확히 구분할 필요가 있다.
 

7.1.3 인터럽트에 대한 대응

- 발생한 예외를 호출 스택의 상위 메소드로 전달한다.
- 호출 스택의 상단에 위치한 메소드가 직접 처리할 수 있도록 인터럽트 상태를 유지한다. 

7.1.4 예제 : 시간 지정 실행

private static final ScheduledExecutorService cancelExec = ...;

 

public static void timedRun(Runnable r, long timeout, TimeUnit unit){

           final Thread taskThread = Thread.currentThread();

           cancelExec.schedule( new Runnable(){

                     public void run(){ taskThread.interrupt(); }

           }, timeout, unit);

          

           r.run();

}

 

위 코드는 내부에서 scheduledThreadPool 을 사용해서 interrupt 를 거는 스레드를 별도로 띄운다.

지정된 시간이 경과하게 되면 interrupt 를 걸도록 설계되었다.

하지만 지정된 시간 이내에 작업이 끝나고 다른 작업을 실행중일때는 커다란 오류를 범하게 된다.

 

public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException {

           class RethrowableTask implements Runnable {

                     private volatile Throwable t;

                     public void run(){

                                try { r.run(); }

                                catch( Throwable t){ this.t = t; }

                     }

                     void rethrow(){

                                if( t!= null)

                                          throw launderThrowable(t);

                     }

           }

          

           RethrowableTask task = new RethrowableTask();

           final Thread taskThread = new Thread(task);

           taskThread.start();

           cancelExec.schedule( new Runnable(){

                     public void run(){ taskThread.interrupted(); }

           }, timeout, unit);

           taskThread.join(unit.toMillies(timeout));

           task.rethrow();

}

 

첫번째 예제와는 다르게 timedRun 을 호출하는 Thread  interrupt 를 거는 방식이 아닌 메소드 내부에서 Thread 를 생성하여 그 생성된 Thread 

interrupt를 거는 방식

 

7.1.5 Future 를 사용해 작업 중단

Executor 에서는 기본적으로 작업을 실행하기 위해 생성하는 스레드는 인터럽트가 걸렸을 때 작업을 중단할 수 있도록 하는 인터럽트 정책을 사용한다.

 

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {

           Future<?> task = taskExec.submit(r);

           try{

                     task.get(timeout, unit);

           }catch( TimeoutException e){

          

           }catch( ExecutionException e){

          

           }finally{

                     task.cancel(true);

           }

          

}

 

7.1.6 인터럽트에 응답하지 않는 블로킹 작업 다루기

- java.io 패키지의 동기적 소켓 I/O

           Socket 에서 가져온 InputStream  read, OutputStream  write  interrupt 에 반응하지 않으며 socket  close 하게되면 read, write 메소드가 중단되면서 SocketException 이 발생한다.

- java.nio 패키지의 동기적 I/O

- Selector 를 사용한 비동기적 I/O

- 락 확보

 

public class RenderThread extends Thread{

           ...

           public void interrupt(){

                     try{

                                socket.close();

                     }catch(IOException e){}

                     finally{

                                super.interrupt();

                     }

           }

           ...

}


7.1.7 newTaskFor 메소드로 비표준적인 중단 방법 처리

ThreadPoolExecutor 에는 newTaskFor 라는 메소드가 추가됬다.

newTaskFor 로 해당 작업을 나타내는 RunnableFuture 객체를 리턴한다.

 

7.2 스레드 기반 서비스 중단

스레드에 대한 소유권은 스레드풀(스레드 기반 서비스)이 가지고 있다. 따라서 개개의 스레드에 대한 인터럽트 요청은 스레드 풀에서 처리되어야 한다.

 

7.2.1 예제 : 로그 서비스

PrintWriter 와 같은 스트림 기반 클래스는 스레드에 안전하기 때문에 println으로 필요한 내용을 출력하는 기능을 사용할 때 별다른 동기화 기법이 필요하지 않다.

 

7.2.2 ExecutorService 종료

종료하는 두개의 메소드

shutdown : 안전하게 종료하는 방법

shutdownNow : 강제로 종료하는 방법

 

7.2.3 독약

프로듀서-컨슈머 패턴으로 구성된 서비스를 종료시키는 또 다른 방법으로 독약이라고 불리는 방법이 있다.

컨슈머가 독약 객체를 가져갔을때 컨슈머 스레드를 종료하는 방법이다.

이 방법은 컨슈머의 갯수를 정확히 알고 있을 때에만 사용할 수 있다.

 

public class IndexingService {

           private static final File POISON = new File("");

           private final IndexerThread consumer = new IndexerThread();

           private final CrawlerThread producer = new CrawlerThread();

           private final BlockingQueue<File> queue;

           private final FileFilter fileFilter;

           private final File root;

          

           class CrawlerThread extends Thread{

                     public void run(){

                                try{

                                          crawl(root);

                                }

                                catch(InterruptedException e){}

                                finally{

                                          while(true){

                                                     try{

                                                                queue.put(POISON);

                                                                break;

                                                     }

                                                     catch(InterruptedException el){ /* 재시도 */}

                                          }

                                }

                     }

                    

                     private void crawl(File root) throws InterruptedException{}

           }

          

           public class IndexerThread extends Thread{

                     public void run(){

                                try{

                                          while(true){

                                                     File file = queue.take();

                                                     if(file == POISON)

                                                                break;

                                                     else

                                                                indexFile(file);

                                          }

                                }

                                catch( InterruptedException consumed){}

                     }

           }

          

           public void start(){

                     producer.start();

                     consumer.start();

           }

          

           public void stop(){

                     producer.interrupt();

           }

          

           public void awaitTermination() throws InterruptedException{

                     consumer.join();

           }

          

          

}

 

7.2.4 예제 : 단번에 실행하는 서비스

 

boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException{

           ExecutorService exec = Executors.newCachedThreadPool();

           final AtomicBoolean hasNewMail = new AtomicBoolean(false);

           try{

                     for( final String host : hosts){

                                exec.execute( new Runnable(){

                                          public void run(){

                                                     if(checkMail(host))

                                                                hasNewMail.set(true);

                                          }

                                });

                     }

           }finally{

                     exec.shutdown();

                     exec.awaitTermination(timeout, unit);

           }

          

           return hasNewMail.get();

}

위의 예제를 보면 finally 구문에서 exec.shutdown 을 호출하여 스레드들의 작업이 끝나기를 기다리고 있다.

 

7.2.5 shutdownNow 메소드의 약점

shutdownNow 메소드 실행시 현재 실행 중인 모든 스레드의 작업을 중단시키도록 하고 등록됐지만 실행은 되지 않았던 모든 작업의 목록을 리턴해준다.

 

private final Set<URL> urlsToCrawl = new HashSet<URL>();

 

public synchronized void stop() throws InterruptedException {

           try{

                     saveUncrawled(exec.shutdownNow());

                     if( exec.awaitTermination(TIMEOUT, UNIT))

                                saveUncrawled(exec.getCancelledTasks());

           }finally{

                     exec = null;

           }

}

 

private void saveUncrawled(List<Runnable> uncrawled){

           for( Runnable task : uncrawled)

                     urlsToCrawl.add(((CrawlTask)task).getPage());

}

 

7.3 비정상적인 스레드 종료 상황 처리

스레드를 예상치 못하게 종료시키는 가장 큰 원인은 바로 RuntimeException이다.

따라서 RuntimeException으로 스레드가 종료되는 경우에는 외부에 종료된다는 사실을 알려 대응이 가능하도록 하여야 한다.

 

public void run(){

           Throwable thrown = null;

           try{

                     while(!isInterrupted())

                                runTask(getTaskFromWorkQueue());

           }catch(Throwable e){

                     thrown = e;

           }finally{

                     threadExited(this, thrown);

           }

}

 

7.3.1 정의되지 않은 예외 처리

스레드 API UncaughtExceptionHandler 라는 기능을 사용하면 처리하지 못한 예외 상황으로 인해 특정 스레드가 종료되는 시점을 정확히 알 수 있다.

 

public interface UncaughtExceptionHandler{

           void uncaughtException(Thread t, Throwable e);

}

 

public class UEHLogger implements Thread.UncaughtExceptionHandler{

           public void uncaughtException(Thread t, Throwable e){

                     Logger logger = Logger.getAnonymousLogger();

                     logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);

           }

}

 

UncaughtExceptionHandler  execute 로 실행시에 적용된다. submit 를 통해 실행하면 리턴되는 Future 객체의 get 메소드 호출시 예외가 발생했을 때 해당 예외가

ExecutionException 에 감싸진 상태로 넘어온다.

 

7.4 JVM 종료

JVM 이 종료되는 두가지 경우

- 예정된 절차대로 종료되는 경우

           .일반 스레드가 모두 종료되는 시점

           .System.exit 메소드를 호출하거나 ctrl+c 키를 입력한 경우

- 예기치 못하게 임의로 종료되는 경우

           .Runtime.halt 메소드 호출하는 경우

           .운영체제 수준에서 JVM 프로세스를 강제로 종료하는 경우

          

7.4.1 종료 훅

예정된 절차대로 종료되는 경우에 JVM  Runtime.addShutdownHook 메소드를 사용해 등록된 종료 훅을 실행한다.

종료 훅이 모두 실행되고 나면 runFinalizersOnExit 값을 확인해 true 일 경우 모든 클래스의 finalize 메소드를 호출하고 종료한다.

종료 훅은 어떤 서비스나 애플리케이션 자체의 여러 부분을 정리하는 목적으로 사용하기 좋다.

예를 들어 임시로 만들어 사용했던 파일을 삭제하거나, 운영체제에서 알아서 정리해주지 않은 모든 자원을 종료훅에서 정리하는게 좋다.

종료 훅의 실행순서는 보장되지 않는다. 따라서 종속 관계를 가질 경우에는 하나의 종료 훅에서 순차적으로 처리해 주어야 한다.

 

public void start(){

           Runtime.getRuntime().addShutdownHook(new Thread(){

                     public void run(){

                                try{

                                          LogService.this.stop();

                                }catch(InterruptedException ignored){}

                     }

           });

}

 

7.4.2 데몬 스레드

스레드는 두 가지 종류로 나뉜다. 일반스레드, 데몬스레드이다.

데몬스레드는 JVM 내부적으로 사용하기 위해 실행하는 스레드이다.(가비지 컬렉터나 기타 여러가지 스레드)

main 스레드에서 생성한 모든 스레드는 일반 스레드이다.

JVM 은 남아 있는 모든 일반 스레드가 종료되고 난 후 JVM 종료 절차를 진행한다. 모든 데몬스레드는 버려지는 셈이다.

데몬 스레드는 예고 없이 종료될 수 있기 때문에 애플리케이션 내부에서 시작시키고 종료시키며 사용하기에는 그다지 좋은 방법이 아니다.

 

7.4.3 finalize 메소드

finalize 메소드는 사용하지 않는게 좋다.

 

 

 

8. 스레드 풀 활용

 

8.1 작업과 실행 정책 간의 보이지 않는 연결 관계

일정한 조건을 갖춘 실행 정책이 필요한 작업에는 다음과 같은 것들이 있다.

- 의존성이 있는 작업

- 스레드 한정 기법을 사용하는 작업

- 응답 시간이 민감한 작업

- ThreadLocal 을 사용하는 작업

 

8.1.1 스레드 부족 데드락

 

8.1.2 오래 실행되는 작업

오래 실행되는 스레드의 갯수가 많을 수록 스레드풀의 응답속도는 느려지게 된다.

따라서 계속해서 대기하는 기능 대신 일정시간 동안만 대기하는 메소드를 사용할 수 있다면 응답 속도를 향상시킬 수 있다.

 

8.2 스레드 풀 크기 조절

 

스레드 풀의 크기는 설정 파일이나 Runtime.availableProcessors() 값에 따라 동적으로 지정되도록 해야 한다.

8.3 ThreadPoolExecutor 
설정

 

8.3.1 스레드 생성과 제거

public ThreadPoolExecutor(int corePoolSize,

int maxizmumPoolSize,

           long keepAliveTime,

           TimeUnit unit,

           BlockingQueue<Runnable> workQueue,

           ThreadFactory threadFactory,

           RejectedExecutionHandler handler){ ... }

corePoolSize 는 스레드 풀을 사용할때 원하는 스레드의 갯수라고 볼 수 있다.

queue 가 가득차지 않는 이상 corePoolSize 를 초과하지 않는다.

최초에 prestartAllCoreThreads 메소드를 실행하면 코어 크기만큼이 스레드가 미리 만들어진다.

 

newFixedThreadPool 의 경우 생성시 지정한 크기로 corePoolSize, maximumPoolSize 가 설정된다. 시간 제한은 무제한이다.

newCachedThreadPool 의 경우 corePoolSize = 0, maximumPoolSize  Integer.MAX_VALUE 로 설정된다. 시간 제한은 1분이다.

 

8.3.2 큐에 쌓인 작업 관리

newFixedThreadPool  newSingleThreadExecutor 는 기본설정으로 크기가 제한되지 않은 LinkedBlockingQueue를 사용한다.

크기가 제한된 큐를 사용하면 자원 사용량을 한정시킬 수 있지만 큐가 가득 찼을 때 새로운 작업을 등록하려는 상황을 처리해야 한다.

큐의 크기를 제한한 상태에서는 큐의 크기와 스레드의 갯수를 동시에 튜닝해야 한다.

 

newCachedThreadPool  SynchronousQueue 를 사용한다. SynchronousQueue 는 내부적으로 스레드 간에 작업을 넘겨주는 기능을 담당한다.

큐의 크기가 0 이므로 스레드의 크기가 최대값에 다다른 상태라면 작업을 거부하게 된다.

 

PriorityBlockingQueue 를 사용하면 우선 순위에 따라 작업이 실행되도록 할 수 있다.

 

(자바 6에서는 LinkedBlockingQueue 대신 SynchronousQueue 를 사용할때 자바 5 버전에 비해 세배 이상의 성능을 발휘한다.)

 

8.3.3 집중 대응 정책

크기가 제한된 큐에 작업이 가득 차면 집중 대응 정책 saturation policy 가 동작한다.

집중 대응 정책은 ThreadPoolExecutor  setRejectedExecutionHandler 메소드를 사용해 설정할 수 있다.

 

- 중단 정책(abort)

           기본적으로 사용하는 집중 대응 정책이며 execute 메소드에서 RejectedExecutionException을 던진다.

- 제거 정책(discard)

           큐에 추가하려던 작업을 제거해 버린다.

- 호출자 실행 정책(caller runs)

           작업을 프로듀서에게 거꾸로 넘겨 작업 추가 속도를 늦출 수 있도록 일종의 속도 조절 방법으로 사용한다.

           execute 메소드 내부에서 main 스레드가 해당 작업을 직접 처리함으로써 또 다른 새로운 작업이 추가되지 못하도록 한다.

           따라서 그 시간동안 스레드는 큐의 작업을 처리할 시간을 가지게 된다.

 

ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY));

executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());

 

8.3.4 스레드 팩토리

스레드풀에서 새로운 스레드를 생성할 때는 스레드 팩토리를 사용해 새로운 스레드를 생성한다.

 

public interface ThreadFactory{

           Thread newThread(Runnable r);

}

 

스레드 팩토리에서 MyAppThread 를 생성하여 스레드의 이름, 현재 실행중인 스레드의 갯수 등을 확인할 수 있다.

 

 

public class MyAppThread extends Thread{

           public static final String DEFAULT_NAME = "MyAppThread";

           private static volatile boolean debugLifecycle = false;

           private static final AtomicInteger created = new AtomicInteger();

           private static final AtomicInteger alive = new AtomicInteger();

           private static final Logger log = Logger.getAnonymousLogger();

          

           public MyAppThread(Runnable r){ this(r, DEFAULT_NAME);}

          

           public MyAppThread(Runnable runnable, String name){

                     super(runnable, name + "=" + created.incrementAndGet());

                     setUncaughtExceptionHandler(

                                new Thread.UncaughtExceptionHandler(){

                                          public void uncaughtException(Thread t, Throwable e){

                                                     log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);

                                          }

                                }

                     );

           }

          

           public void run(){

                     boolean debug = debugLifecycle;

                     if( debug) log.log(Level.FINE, "Created " + getName());

                     try{

                                alive.incrementAndGet();

                                super.run();

                     }finally{

                                alive.decrementAndGet();

                                if(debug) log.log(Level.FINE, "Exiting " + getName());

                     }

           }

          

           public static int getThreadsCreated(){ return created.get();}

           public static int getThreadsAlive(){ return alive.get();}

           public static boolean getDebug(){ return debugLifecycle;}

           public static void setDebug(boolean b){debugLifecycle = b;}

}

 

8.3.5 ThreadPoolExecutor 생성 이후 설정 변경

Executors 로 생성한 ThreadPool 은 관련된 set 메소드를 사용해서 내부의 설정된 값

(코어 스레드 갯수, 최대 스레드 갯수, 스레드 유지 시간, 스레드 팩토리, 작업 거부 처리 정책)을 변경할 수 있다.

 

Executors 에는 unconfigurableExecutorSevice 를 사용해서 설정을 변경할 수 없는 ThreadPool을 생성할 수도 있다.

 

8.4 ThreadPoolExecutor 상속

ThreadPoolExecutor  beforeExecute, afterExecute, terminated 와 같은 훅 메소드를 제공한다.

만약 beforeExecute 메소드에서 RuntimeException 이 발생하면 해당 작업뿐 아니라 afterExecute 메소드도 실행되지 않는다.

 

처리 순서

beforeExecute

작업

afterExecute

terminatd

 

8.4.1 예제 : 스레드 풀에 통계 확인 기능 추가

public class TimingThreadPool extends ThreadPoolExecute{

           private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();

           private final Logger log = Logger.getLogger("TimingThreadPool");

           private final AtomicLong numTasks = new AtomicLong();

           private final AtomicLong totalTime = new AtomicLong();

          

           protected void beforeExecute(Thread t, Runnable r){

                     super.beforeExecute(t, r);

                     log.fine(String.format("Thread %s: start %s", t, r);

                     startTime.set(System.nanoTime());

           }

          

           protected void afterExecute(Runnable r, Throwable t){

                     try{

                                long endTime = System.nanoTime();

                                long taskTime = endTime - startTime.get();

                                numTask.incrementAndGet();

                                totalTime.addAndGet(taskTime);

                                log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));

                     }finally{

                                super.afterExecute(r, t);

                     }

           }

          

           protected void terminated(){

                     try{

                                log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));

                     }finally{

                                super.terminated();

                     }

           }

}

 

 

8.5 재귀 함수 병렬화

순차적 프로그램을 병렬 프로그램으로 변경

void processSequentially(List<Element> elements){

           for( Element e : elements)

                     process(e);

}

 

void processInParallel(Executor exec, List<Element> elements){

           for( final Element e : elements)

                     exec.execute(new Runnable(){

                                public void run(){ process(e);}

                     });

}

 

ExecutorService.invokeAll 메소드를 사용하면 한 묶음의 작업을 한꺼번에 등록하고 그 작업들이 모두 종료될 때까지 대기하도록 할 수 있다.

 

public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){

           for(Node<T> n : nodes){

                     results.add(n.compute());

                     sequentialRecursive( n.getChildren(), results);

           }

}

 

public<T> void paralleRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){

           for(final Node<T> n : nodes){

                     exec.execute(new Runnable(){

                                public void run(){

                                          results.add(n.compute());

                                }

                     });

                     parallelRecursive(exec, n.getChildren(), results);

           }

}

 

public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException{

           ExecutorService exec = Executors.newCachedThreadPool();

           Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();

           parallelRecursive(exec, nodes, resultQueue);

           exec.shutdown();

           exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

           return resultQueue;

}

 

8.5.1 예제 : 퍼즐 프레임웍

 

 

10. 활동성을 최대로 높이기

 

10.1 데드락

자바 프로그램에서 데드락이 발생하면 프로그램을 강제로 종료하기 전에는 영원히 멈춘 상태로 유지된다.

 

10.1.1 락 순서에 의한 데드락

예제 코드는 데드락이 발생할 여지가 있다.

public class LeftRightDeadlock{

           private final Object left = new Object();

           private final Object right = new Object();

          

           public void leftRight(){

                     synchronized(left){

                                synchronized(right){

                                          doSomething();

                                }

                     }

           }

          

           public void rightLeft(){

                     synchronized(right){

                                synchronized(left){

                                          doSomething();

                                }

                     }

           }

}

 

프로그램 내부의 모든 스레드에서 필요한 락을 모두 같은 순서로만 사용한다면 락 순서에 의한 데드락은 발생하지 않는다.

 

10.1.2 동적인 락 순서에 의한 데드락

public void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount)

           throws InsufficientFundsException{

           synchronized(fromAccount){

                     synchronized(toAccount){

                                if(fromAccount.getBalance().compareTo(amount) < 0)

                                          throw new InsufficientFundsException();

                                else{

                                          fromAccount.debit(amount);

                                          toAccount.credit(amount);

                                }

                     }

           }                   

}

transfreMoney(myAccount, yourAccount, 10);

transferMoney(yourAccount, myAccount, 20);

위 처럼 호출시 데드락에 빠질 여지가 있다.

 

락의 순서를 프로그램으로 제어할 수 있다면 데드락을 방지할 수 있다.

System.identityHashCode 메소드를 사용하여 객체의 순서를 정의해 락을 걸어주면 데드락을 방지할 수 있다. 만일에라도 객체의 순서가 같을 경우를 대비해 tieLock 을 사용한다.

 

private static final Object tieLock = new Object();

 

public void transferMoney(final Account fromAcct, final Account toAcct, final DollarAmount amount)

           throws InsufficientFundsException {

          

           class Helper {

                     public void transfer() throws InsufficientFundsException {

                                if(fromAcct.getBalance().compareTo(amount) < 0)

                                          throw new InsufficientFundsException();

                                else {

                                          fromAcct.debit(amount);

                                          toAcct.credit(amount);

                                }

                     }         

           }

          

           int fromHash = System.identityHashCode(fromAcct);

           int toHash = System.identityHashCode(toAcct);

          

           if(fromHash < toHash){

                     synchronized(fromHash){

                                synchronized(toAcct){

                                          new Helper().transfer();

                                }

                     }

           }else if(fromHash > toHash){

                     synchronized(toAcct){

                                synchronized(fromAcct){

                                          new Helper().transfer();

                                }

                     }

           }else{

                     synchronized(tieLock){

                                synchronized(fromAcct){

                                          synchronized(toAcct){

                                                     new Helper.transfer();

                                          }

                                }

                     }

           }

}

 

10.1.3 객체 간의 데드락

Taxi  setLocation 메소드에서는 Taxi, Dispatcher 의 순서로 락을 획득하며

Dispatcher  getImage 메소드에서는 Dispatcher, Taxi 의 순서로 락을 획득한다.

따라서 데드락에 빠질 위험이 있다.

class Taxi{

           @GuardedBy("this") private Point location, destination;

           private final Dispatcher dispatcher;

          

           public Taxi(Dispatcher dispatcher){

                     this.dispatcher = dispatcher;

           }

          

           public synchronized Point getLocation(){

                     return location;

           }

          

           public synchronized void setLocation(Point location){

                     this.location = location;

                     if(location.equals(destination))

                                dipatcher.notifyAvailable(this);

           }

}

 

class Dispatcher{

           @GuardedBy("this") private final Set<Taxi> taxis;

           @GuardedBy("this") private final Set<Taxi> availableTaxis;

          

           public Dispatcher(){

                     taxis = new HashSet<Taxi>();

                     availableTaxis = new HashSet<Taxi>();

           }

          

           public synchronized void notifyAvailable(Taxi taxi){

                     availableTaxis.add(taxi);

           }

          

           public synchronized Image getImage(){

                     Image image = new Image();

                     for(Taxi t : taxis)

                                image.drawMarker(t.getLocation());

                     return image;

           }

}

 

10.1.4 오픈 호출 open call

오픈 호출은 락을 전혀 확보하지 않은 상태에서 메소드를 호출하는 것을 말한다.

public void setLocation(Point location){

           boolean reachedDestination;

           sychronized(this){

                     this.location = location;

                     reachedDestination = location.equals(destination);

           }

          

           if( reachedDestination)

                     dipatcher.notifyAvailable(this);

}

 

public Image getImage(){

           Set<Taxi> copy;

           sychronized(this){

                     copy = new HashSet<Taxi>(taxis);

           }

           Image image = new Image();

           for(Taxi t : copy)

                     image.drawMarker(t.getLocation());

          

           return image;

}

 

10.1.5 리소스 데드락

두개의 데이터 베이스에 대한 커넥션 풀을 사용할 때 두개의 커넥션을 모두 요청하는 경우

A 스레드는 D1, D2 의 순서로 커넥션을 요청하고 B 스레드는 D2, D1 의 순서로 요청하는 경우

커넥션풀의 크기가 클수록 문제가 발생할 확률은 줄어들지만 발생할 여지는 존재한다.

 

10.2 데드락 방지 및 원인 추적

한번에 하나 이상의 락을 사용하지 않는 프로그램은 락의 순서에 의한 데드락이 발생하지 않는다.

 

10.2.1 락의 시간 제한

Lock 클래스의 메소드 가운데 시간을 제한할 수 있는 tryLock 메소드를 사용하면 지정한 시간 또한 락을 확보하지 못한다면 tryLock 메소드가 오류를 발생시키도록 할 수 있다.

 

10.2.2 스레드 덤프를 활용한 데드락 분석

스레드 덤프에는 실행 중인 모든 스레드의 스택 트레이스 stack trace 가 담겨 있다. 락과 관련된 정보 또한 담겨 있다.

JVM이 스레드 덤프를 생성하도록 하려면 Unix 플랫폼에서는 JVM 프로세스에 SIGQUIT 시그널(kill -3)을 전송하거나 Ctrl-\ 키를 누르면 되고, 윈도우 환경에서는 Ctrl-Break 키를 누르면 된다.

 

10.3 그 밖의 활동성 문제

활동성을 떨어뜨리는 주된 원인은 데드락이지만, 소모 starvation, 놓친 신호, 라이브락 livelock 같은 다양한 원인이 존재한다.

 

10.3.1 소모

- 소모 starvation 상태는 스레드가 작업을 진행하는데 꼭 필요한 자원을 영영 할당받지 못하는 경우에 발생한다.

- 소모 상황이 발생하는 원인은 대부분 스레드의 우선 순위 priority 를 적절치 못하게 올리거나 내리는 부분에 있다. 또한 락을 확보한 채로 종료되지 않는 코드.

- 스레드 우선 순위를 변경하고 나면 플랫폼에 종속적인 부분이 많아지며, 따라서 활동성 문제를 일으키기 쉽다. 대부분의 병렬 애플리케이션은 모든 스레드의 우선 순위에 기본값을 사용하고 있다.

 

10.3.2 형편 없는 응답성

애플리케이션의 응답성이 떨어진다면 락을 제대로 관리하지 못하는 것이 원인일 수 있다.

특정 스레드가 대량의 데이터를 넘겨 받아 처리하느라 필요 이상으로 긴 시간 동안 락을 확보하고 있다면 넘겨준 대량의 데이터를 사용해야 하는 다른 스레드는 데이터를 받아올 때까지 상당히 긴 시간동안 대기해야 한다.

 

10.3.3 라이브락

라이브락은 에러를 너무 완벽하게 처리하고자 회복 불가능한 오류를 회복 가능하다고 판단해 계속해서 재시도하는 과정에 나타난다.


 

 

11. 성능, 확장성

스레드를 사용하는 가장 큰 목적은 성능을 향상시키기 위한 것이다.

 

11.1 성능에 대해

성능을 높인다는 것은 더 적은 자원을 사용하면서 더 많은 일을 하도록 만드는 것이다.

여러 개의 스레드를 사용하려 한다면 항상 단일 스레드를 사용할 때보다 성능상의 비용을 지불해야만 한다.

- 스레드간의 작업 내용 조율(, 신호 보내기, 메모리 동기화)

- 컨텍스트 스위칭

- 스레드를 생성하거나 제거하는일

- 스레드의 효율적인 스케줄링

 

11.1.1 성능 대 확장성

애플리케이션 성능을 측정하는 데이터

- 서비스 시간, 대기 시간, 처리량, 효율성, 확장성, 용량

단일 티어 애플리케이션이 다중 티어 애플리케이션 보다 성능이 나을 가능성이 많다. 하지만 단일 티어 애플리케이션이 처리할 수 있는 부하를 넘어서는 작업량이 주어지면 문제가 심각해진다.

따라서 서버 애플리케이션을 만들 떄는 얼마나 빠르게 보다는 얼마나 많이 라는 측면을 훨씬 중요하게 생각하는 경우가 많다.

 

11.1.2 성능 트레이드 오프 측정

최적화 기법을 너무 이른 시점에 적용하지 말아야 한다. 일단 제대로 동작하게 만들고 난 다음에 빠르게 동작하도록 최적화해야 하며, 예상한 것보다 심각하게 성능이 떨어지는 경우에만 최적화 기법을 적용하는 것으로도 충분하다.

 

11.2 암달의 법칙

암달의 법칙 Amdahl’s law 을 사용하면 병렬 작업과 순차 작업의 비율에 따라 하드웨어 자원을 추가로 투입했을 때 이론적으로 속도가 얼마나 빨라질지에 대한 예측 값을 얻을 수 있다.

속도 증가량 <= 1 / ( F + (1-F) / N )

F : 순차적으로 실행돼야 하는 작업의 비율

N : 하드웨어에 꽂혀 있는 프로세서의 개수

 

하드웨어에 CPU 10개 꽂혀 있을 때, 10% 의 순차 작업을 갖고 있는 프로그램은 최고 5.3배 만큼의 속도가 증가 할 수 있다.

1 / ( 0.1 + ( 1 ? 0.1) / 10) = 5.26….

 

암달의 법칙을 적용해보려면 애플리케이션 내부에서 순차적으로 처리해야 하는 적업이 얼마나 되는지 확인해야 한다.

 

11.2.1 예제 : 프레임웍 내부에 감춰져 있는 순차적 실행 구조

LinkedList 보다는 LinkedBlockingQueue 를 사용하는 것이 성능 향상에 좋다.

 

11.2.2 정성적인 암달의 법칙 적용 방법

 

11.3 스레드와 비용

 

11.3.1 컨텍스트 스위칭

컨텍스트 스위칭이란 스레드 스케줄링에 의해 다음 스레드가 실행되야 할 때 현재 스레드의 실행 상태를 보관해 두고, 다음 스레드의 실행 상태를 읽어들이는 것을 말한다.

 

유닉스 시스템의 vmstat 명령이나 윈도우 시스템의 perfmon 유틸리티를 사용하면 컨텍스트 스위칭이 일어난 횟수를 확인할 수 있드며 커널 수준에서 얼마만큼의 시간을 소모했는지 확인할 수 있다. 커널 활용도가10%가 넘는 높은 값ㅇ르 갖고 있다면 스케줄링에 부하가 걸린다는 의미이며, 애플리케이션 내부에서 I/O 작업이나 락 관련 동기화 부분에서 대기 상태에 들어가는 부분이 원인일 가능성이 높다.

 

11.3.2 메모리 동기화

락 생략(lock elision)

public String getStoogeNames(){

             List<String> stooges = new Vector<String>();

             stooges.add("Moe");

             stooges.add("Larry");

             stooges.add("Curly");

             return stooges.toString();

       }

유출 분석(escape analysis)을 통해 4번이나 락을 잡았다 놓는것 대신 한번에 빠르게 실행된다.

IBM JVM, 자바 7 핫스팟 VM 에서 적용

잘 만들어진 컴파일러라면 getStoogeNames 메소드가 항상 같은 값을 리턴하는 것을 파악한 후 매 실행시마다 처음 실행결과를 리턴하도록 재 컴파일하기도 한다.

 

락 확장(lock coarsening)

연달아 붙어 잇는 여러 개의 synchronized 블록을 하나의 락으로 묶는 방법

 

11.3.3 블로킹

경쟁 조건이 발생하는 동기화 작업에는 운영체제가 관여하는데 해당 부분은 일정량의 자원을 소모한다. 락을 확보하지 못한 스레드는 대기상태에 들어가도록 하는데 JVM 은 두가지 방법을 사용한다.

- 스핀 대기(spin waiting) : 락을 확보할 때까지 계속해서 재시도하는 방법

- 운영체제가 제공하는 기능을 사용해 스레드를 실제 대기 상태로 두는 방법

대기 시간이 짧을 때는 스핀 대기가 효과적이고, 길 때는 운영체제의 기능을 호출하는게 효율적이다.

 

11.4 락 경쟁 줄이기

락으로 사용 제한이 걸려 있는 독점적인 자원을 사용하려는 모든 스레드는 해당 자원을 한 번에 하나의 스레드만 사용할 수 있기 때문에 순차적으로 처리될 수 밖에 없다.

(병렬 애플리케이션에서 확장성에 가장 큰 위협이 되는 존재는 바로 특정 자원을 독점적으로 사용하도록 제한하는 락이다.)

 

락 경쟁 조건을 줄일 수 있는 방법

- 락을 확보한 채로 유지되는 시간을 최대한 줄인다.

- 락을 확보하고자 요청하는 횟수를 최대한 줄인다.

- 독점적인 락 대신 병렬성을 크게 높여주는 여러 가지 조율 방법을 사용한다.

 

11.4.1 락 구역 좁히기

 

11.4.2 락 정밀도 높이기

다수의 락을 사용해 각 객체별로 필요한 만큼만 락을 확보하도록 하면 스레드 간의 경쟁을 크게 줄일 수 있다. 따라서 경쟁이 줄어들면 락을 확보하고자 대기하는 경우 역시 줄어들기 때문에 애플리케이션의 확장성이 늘어날 것이라고 예상할 수 있다. 하지만 그만큼 데드락에 빠질 위험도 높아진다.

 

11.4.3 락 스트라이핑 lock striping

락 스트라이핑 은 독립적인 객체를 여러 가지 크기의 단위로 묶어내고, 묶인 블록 단위로 락을 나누는 방법을 말한다. 예를 들어 ConcurrentHashMap 클래스는 16개의 락을 배열로 담은후 16개의 락 각자가 전체 해시 범위의 1/16에 대한 락을 담당한다. 따라서 락 경쟁이 발생할 확률을 1/16로 낮춰준다.

 

11.4.4 핫 필드 최소화

자주 계산하고 사용하는 값을 캐시에 저장해 두도록 최적화한다면 확장성을 떨어뜨릴 수 밖에 없는 핫 필드(hot fields) 가 나타난다.

HashMap 에서 락 스트라이핑을 사용해 락을 분할하여 확장성을 향상시킬 수 있지만 데이터를넣고 뺄 때마다 size 메소드에서 사용하는 값을 매번 변경해야 한다면 해당 값을 변경하느라 경쟁이 발생하게 된다. 이와 같은 size 변수를 핫 필드라고 한다.

ConcurrentHashMap 은 락으로 분할된 부분마다 카운트 변수를 두어 size 메소드 호출시에는 모든 카운트 변수를 더해 돌려준다.

 

11.4.5 독점적인 락을 최소화하는 다른 방법

 

11.4.6 CPU 활용도 모니터링

애플리케이션의 확장성을 테스트할 때 그 목적은 대부분 CPU를 최대한 활용하는 데 있다.

 

11.4.7 객체 풀링은 하지 말자

최근의 자바 프로그램은 메모리를 항당하는 작업이 C 언어의 malloc 함수보다 빨라졌다.

병렬 애플리캐이션에서 객체 풀링을 사용했을 경우에는 해당 풀에서 객체를 얻기 위한 경쟁이 발생하게 된다. 또 스레드가 대기상태에 들어가는 비용보다는 새로운 객체를 생성하는게 성능상 좋다.

 

11.5 예제 : Map 객체의 성능 분석

 

11.6 컨텍스트 스위치 부하 줄이기

 

 

 

 

ReentrantLock 은 자바5.0 에서 추가됬으며 암묵적인 락으로 할 수 없는 고급 기능을 가지고 있다.

 

13.1 Lock  ReentrantLock

- Lock 인터페이스는 조건없는 락, 폴링 락, 타임아웃이 있는 락, 락 확보 대기상태에 인터럽트를 걸 수 있는 기능을 가진다.

- 모든 작업이 명시적이다.

public interface Lock {

       void lock();

       void lockInterruptibly() throws InterruptedException();

       boolean tryLock();

       boolean tryLock( long timeout, TimeUnit unit() throws InterruptedException();

       void unlock();

       Condition newCondition();

}

 

ReentrantLock  Lock 인터페이스를 구현한다. synchronized 구문 과 동일한 메모리 가시성과 상호 배제 기능을 제공한다.

Lock lock = new ReentrantLock();

       lock.lock();

       try{

            

       }finally{

             lock.unlock();

}

ReentrantLock  finally 구문에서 반드시 락을 해제해 주어야 한다.

 

 

13.1.1 폴링과 시간 제한이 있는 락 확보 방법

락을 확보할 때 시간제한을 두거나 폴링 방법(trylock)을 사용하면 락을 확보하지 못하는 상황에도 통제권을 다시 얻을 수 있다.

public boolean trySendOnSharedLine(String message, long timeout, TimeUnit unit) throws InterruptedException{

             long nanoToLock = unit.toNanos(timeout) - estimateNanosToSend(message);

             if(!lock.tryLock(nanosToLock, NANOSECONDS))

                    return false;

             try{

                    return sendOnSharedLine(message);

             }finally{

                    lock.unlock();

             }

       }

 

13.1.2 인터럽트 걸 수 있는 락 확보 방법

public boolean sendOnSharedLine(String message) throws InteruptedException{

             lock.lockInterruptibly();

             try{

                    return cancellableSendOnSharedLine(message);

             }finally{

                    lock.unlock();

             }

       }

 

13.1.3 블록을 벗어나는 구조의 락

 

13.2 성능에 대한 고려

자바 5.0과 함께 ReentrantLock이 처음 소개됐을 때 암묵적인 락에 비해 훨씬 나은 경쟁 성능을 보여줬다. 하지만 자바 6.0에서는 암묵적인 락과 ReentrantLock 의 성능이 별반 차이가 없다.

 

13.3 공정성

ReentrantLock 은 두 종류의 락 공정성 설정을 지원한다. 불공정 unfair 방법 과 공정 fair 방법이다.

공정한 방법은 락을 확보하고자 하는 스레드는 항상 큐의 맨끝 대기열에 들어간다.

불공정 방법은 락을 확보하려는 시점에 락이 사용중이라면 대기열에 들어가게되고 확보하려는 찰나에 락이 해제되었다면 대기열에 대기중인 스레드를 뛰어넘어 락을 확보하게 된다.

 

공정하게만 처리하다 보면 스레드를 반드시 멈추고 다시 실행시키는 동안에 성능에 큰 지장을 줄 수 있기 때문이다.

 

13.4 synchronized 또는 ReentrantLock 선택

ReentrantLock 은 암묵적인 락만으로는 해결할 수 없는 복잡한 상황에서 사용하기 위한 고급 동기화 기능이다. 다음과 같은 고급 동기화 기법을 사용해야 하는 경우에만 ReentrantLock을 사용하도록 하자

- 락을 확보할 때 타임아웃을 지정해야 하는 경우

- 폴링의 형태로 락을 확복하고자 하는 경우

- 락을 확보하느라 대기 상태에 들어가 있을 때 인터럽트를 걸 수 있어야 하는 경우

- 대기 상태 큐 처리 방법을 공정하게 해야 하는 경우

- 코드가 단일 블록의 형태를 넘어서는 경우

그 외의 경우에는 synchronized 블록을 사용하도록 하자

 

자바 5.0에서는 synchronized 블록을 사용했을 때 스레드 덤프를 통해 어느 메소드에서 어느 락을 확보하고 있고, 데드락에 걸린 스레드가 있는지, 어디에서 데드락이 걸렸는지 확인할 수 있는 반면 ReentrantLock 을 알 수 없다.

자바 6.0에서는 ReentrantLock 에 모니터링 인터페이스를 추가하여 해결하였다.

 

13.5 읽기-쓰기 락

읽기 작업은 여러 개를 한꺼번에 처리할 수 있지만 쓰기 작업은 혼자만 동작할 수 있는 구조의 동기화를 처리해 주는 락이 읽기-쓰기 락 read-write lock 이다.

대다수의 작업은 데이터 변경이 아닌 읽기 작업이다. 이런 상황에서는 락의 조건을 풀어 읽기 연산은 여러 스레드에서 동시에 실행할 수 있도록 해주면 성능을 크게 향상시킬 수 있다.

 

public interface ReadWriteLock{

       Lock readLock();

       Lock writeLock();

}

 

 

자바 5.0에서 읽기 락은 읽기 작업을 하고 잇는 스레드의 개수만 세고 실제로 어느 스레드가 읽고 있는지는 상관없다. 반면 자바 6.0에서는 어느 스레드가 읽기 락을 확보했는지 추적하도록 되어 있다.

 

 

 

14. 동기화 클래스 구현

암묵적인 조건 큐 condition queue, 명시적인 Condition 객체, AbstractQueuedSynchronizer 프레임 웍 등 저수준 기능을 사용하여 원하는 동기화 클래스를 구현할 수 있다.

 

 

14.1 상태 종속성 관리

병렬 프로그램에서 상태 기반의 조건은 다른 스레드를 통해서 언제든지 마음대로 변경될 수 있다.

 

[상태 종속적인 작업의 동기화 구조]

void blockingAction() throws InterruptedException{

             상태 변수에 대한  확보

             while(선행조건이 만족하지 않음){

                    확보했던 락을 풀어줌

                    선행 조건이 맍고할만한 시간만큼 대기

                    인터럽트에 걸리거나 타임아웃이 걸리면 멈춤

                    락을 다시 확보

             }

             작업 실행

              해제

       }

 

위의 예는 락을 활용하는 형태가 그다지 일반적인지 않다. 이를테면 작업하고자 확보했던 락을 그 내부에서 다시 풀어주고 또 다시 확보하는 우스꽝스런 모습이다.

 

 

14.1.1 예제 : 선행 조건 오류를 호출자에게 그대로 전달

 

[선행 조건이 맞지 않으면 그냥 멈춰버리는 버퍼 클래스]

public synchronized void put(V v) throws BufferFullExceptipon{

             if(isFull())

                    throw new BufferFullException();

             doPut(v);

       }

      

       public synchronized V take() throws BufferEmptyException{

             if(isEmpty())

                    throw new BufferEmptyException();

             return doTake();

       }

위의 메소드를 사용하는 클래스는 put, take 메소드를 호출할 때마다 발생할 가능성이 있는 예외 상황을 매번 처리해줘야 한다. 이처럼 상태 종속성을 호출자에게 넘기는 방법을 쓰면 FIFO 큐에서 값의 순서를 정확하게 유지하는 일이 불가능하다.

 

 

14.1.2 예제 : 폴링과 대기를 반복하는 세련되지 못한 대기 상태

public void put(V v) throws InterruptedException{

             while(true){

                    synchronized(this){

                           if(!isFull()){

                                 doPut(v);

                                 return;

                           }

                    }

                    Thread.sleep(SLEEP_GRANULARITY);

             }

       }

 

public V take() throws InterruptedException{

             while(true){

                    synchronized(this){

                           if(!isEmpty()){

                                 return doTake();

                           }

                    }

                    Thread.sleep(SLEEP_GRANULARITY);

             }

       }

 

sleep 시간에 따라 응답속도와 CPU 사용량 간의 트레이드 오프 trade off 가 발생한다.

 

 

14.1.3 조건 큐 ? 문제 해결사

 

[조건 큐를 사용해 구현한 BoundedBuffer]

public synchronized void put(V v) throws BufferFullExceptipon{

             while(isFull())

                    wait();

             doPut(v);

             notifyAll();

       }

      

       public synchronized V take() throws BufferEmptyException{

             while(isEmpty())

                    wait();

             V v = doTake();

             notifyAll();

             return v;

       }

 

자바 언어에서는 모든 객체를 락으로 활용할 수 있는 것처럼 모든 객체는 스스로를 조건 큐로 사용할 수 있으며, 모든 객체가 갖고 있는 wait, notity, notifyAll 메소드는 조건 큐의 암묵적인 API라고 봐도 좋다.

Object.wait 메소드는 현재 확보하고 있는 락을 자동으로 해제하면서 운영체제에게 현재 스레드를 멈춰달라고 요청하고, 따라서 다른 스레드가 락을 확보해 객체 내부의 상태를 변경할 수 있도록 해준다.

조건 큐를 사용했을 때 이전 버전보다 CPU 사용의 효율성, 컨텍스트 스위치 관련 부화, 응답 속도 측면에서 최적화 되어 있다.

 

 

14.2 조건 큐 활용

조건 큐를 활용하려면 꼭 지켜야만 하는 몇 가지 규칙이 있다.

 

 

14.2.1 조건 서술어

조건 서술어는 클래스 내부의 상태 변수에서 유추할 수 있는 표현식이다.

take 메소드의 입장에서는 작업을 진행하기 전에 확인해야만 하는 버퍼에 값이 있어야 한다 는 것이 조건 서술어이다. put 메소드의 입장에서는 버퍼에 빈 공간이 있다.” 는 것이 조건 서술어이다.

 

조건 큐와 연결된 조건 서술어를 항상 문서로 남겨야 하며, 그 조건 서술어에 영향을 받는 메소드가 어느 것인지도 명시해야 한다.

 

조건부 대기와 관련된 락과 wait 메소드와 조건 서술어는 중요한 삼각 관계르 유지하고 있다. 조건 서술어는 상태 변수를 기반으로 하고 있고, 상태 변수는 락으로 동기화되어 있어 조건 서술어를 만족하는지 확인하려면 반드시 락을 확보해야만 한다. 또 락 객체와 조건 큐 객체는 반드시 동일한 객체여야만 한다.

 

 

14.2.2 너무 일찍 깨어나기

하나의 암묵적인 조건 큐를 두 개 이상의 조건 서술어를 대상으로 사용할 수도 있다. 따라서 어디에선가 notifyAll 이 호출돼서 스레드가 깨어났다하더라도 wait 하기 직전에 확인했던 조건 서술어를 만족하게 된다는 것은 아니다. notifyAll 을 호출하는 시점에는 조건 서술어를 만족하는 상태였다고 해도 락을 확보하고 나서는 만족하지 않는 상태가 됐을 가능성도 있다. 다른 스레드가 미리 락을 확보하고 조건 서술어와 관련된 상태 변수의 값을 변경시킬 가능성도 있다.

 

[상태 종속적인 메소드의 표준적인 형태]

void stateDependentMethod() throws InterruptedException{

             // 조건 서술어는 반드시 락으로 동기화된 이후에 확인해야 한다.

             synchronized(lock){

                    while(!conditionPredicate())

                           lock.wait();

                    // 객체가 원하는 상태에 맞쳐졌다.

             }

       }

 

조건부 wait 메소드(Object.wait 또는 Condition.wait) 사용할  유의점

- 항상 조건 서술어(작업을 계속 진행하기 전에 반드시 확인해야 하는 확인 절차) 명시해야 한다.

- wait 메소드를 호출하기 전에 조건 서술어를 확인하고, wait 에서 리턴된 이후에도 조건 서술어를 확인해야 한다.

- 조건 서술어를 확인하는  관련된 모든 상태 변수는 해당 조건 큐의 락에 의해 동기화  있어야 한다.

- wait, notify, notifyAll 메소드를 호출할 때는 조건 큐에 해당하는 락을 확보하고 있어야 한다.

- 조건 서술어를 확인한 이후 실제로 작업을 실행해 작업이 끝날 때까지 락을 해제해서는  된다.

 

 

14.2.3 놓친 신호

wait 메소드를 호출하기 전에 조건 서술어를 확인하지 못하는 경우가 생길  있다면 놓친 신호 문제가 발생할 가능성도 있다. wait 메소드 작성시 위의 조건과 같은 형태로 작성되었다면 놓친 신호문제에 대해 걱정하지 않아도 된다.

 

 

14.2.4 알림

notify 메소드를 호출하면 JVM  해당하는 조건 큐에서 대기 상태에 들어가 있는 다른 스레드 하나를 골라 대기 상태를 풀어준다. notifyAll 호출하면 해당하는 조건 큐에서 대기 상태에 들어가있는 모든 스레드를 풀어준다. notify, notifyAll  호출한 이후에는 최대한 빨리 락을 풀어줘야 대기 상태에서 깨어난 스레드가 빠르게 동작을 취할  있다.

 

여러 개의 스레드가 하나의 조건 큐를 놓고 대기 상태에 들어갈  있는데, 대기 상태에 들어간 조건이 서로 다를  있기 때문에 notifyAll 대신 notify 메소드를 사용해 대기 상태를 풀어주는 방법은 위험성이 높다.

 

notifyAll 대신 notify 메소드를 사용하려면 다음과 같은 조건에 해당하는 경우에만 사용하는 것이 좋다.

- 단일 조건에 따른 대기 상태에서 깨우는 경우 : 해당하는 조건 큐에  하나의 조건만 사용하고 있는 경우이고, 따라서  스레드는 wait 메소드에서 리턴될  동일한 방법으로 실행된다.

-  번에 하나씩 처리하는 경우 : 조건 변수에 대한 알림 메소드를 호출하면 하나의 스레드만 실행시킬  있는 경우

 

notifyAll  호출하게 되면 많은 스레드가 모두 깨어나서 다시 락을 잡으려고 하기 때문에 컨텍스트 스위칭이 빈번하게 일어나고, 상당량의  확보 경쟁이 벌어진다.

 

조건부 알림 conditional notification

take  put 메소드가 대기 상태에서 빠져나올  있는 상태를 만들어주는 경우에만 알림 메소드를 호출하도록 하면 이런 보수적인 측면을 최적화할  있다.

 

 

[조건부 알림을 적용한 경우]

public synchronized void put(V v) throws InterruptedException{

             while(isFull())

                    wait();

             boolean wasEmpty = isEmpty();

             doPut(v);

             if(wasEmpty)

                    notifyAll();

       }

 

wasEmpty 변수 값을 통해서 큐가 비워져 있었는지를 판단한다. 비워져 있었을 경우에는 take 메소드에서 대기중인 스레드가 있을 가능성이 있다고 판단하여 notifyAll  호출해 준다.

 

 

14.2.5 예제 : 게이트 클래스

게이트 클래스는 문이 열릴떄까지 모든 스레드를 대기하도록 하고 특정 조건이 맞을  모든 스레드를 통과하게 한다.

 

@ThreadSafe

public class ThreadGate{

       // 조건 서술어 : opened-since(n) (isOpen || generation)

       @GuardedBy("this") private boolean isOpen;

       @GuardedBy("this") private int generation;

      

       public synchronized void close(){

             isOpen = false;

       }

      

       public synchronized void open(){

             ++generation;

             isOpen = true;

             notifyAll();

       }

      

       // 만족할 때까지 대기 : opened-since (generation on entry)

       public synchronized void await() throws InterruptedException {

             int arrivalGeneration = generation;

             while (!isOopen && arrivalGeneration == generation)

                    wait();

       }

}

 

ThreadGate 클래스는 여러  열고 닫는 것을 가능하게 설계되었다.

스레드는 await 메소드를 호출하여 대기하게 되고 어디선가 open, close  연속으로 호출하게 되면 게이트를 열고 닫는다.

open 메소드 호출시 게이트가 열리게 되며 대기중이던 모든 스레드가 해당 게이트를 통과하게 된다. open, close 메소드는 찰나에 실행되므로  사이에 모든 스레드가 락을 획득하여 통과하지 못할수도 있다. 따라서 ThreadGate 클래스에서는 arrivalGeneration 변수를 두어 generation 변수와 비교하고 있다. open 메소드 호출시 generation 값을 증가하여 대기하던 스레드의arrivalGeneration 값과 비교하며  변수의 값이 다르다면 통과하도록 되어 있다.

 

 

14.2.6 하위 클래스 안전성 문제

 

 

14.2.7 조건 큐 캡슐화

일반적으로 조건 큐를 클래스 내부에 캡슐화해서 클래스 상속 구조의 외부에서는 해당 조건 큐를 사용할 수 없도록 막는게 좋다.

 

 

14.2.8 진입 규칙과 완료 규칙

wait  notify를 적용하는 규칙을 진입규칙과 완료규칙으로 표현한다. 즉 상태를 기반으로 하는 모든 연산과 상태에 의존성을 갖고 있는 또 다른 상태를 변경하는 연산을 수행하는 경우에는 항상 진입규칙과 완료규칙을 정의하고 문서화해야 한다.

 

 

14.3 명시적인 조건 객체

암묵적인 락을 일반화한 형태가 Lock 클래스인 것처럼 암묵적인 조건 큐를 일반화한 형태는 바로 Condition 객체이다.

 

public interface Condition{

       void await() throws InterruptedException;

       boolean await(long time, TimeUnit unit) throws InterruptedException;

       long awaitNanos(long nanosTimeout) throws InterruptedException;

       void awaitUninterrupterbly();

       boolean awaitUntil(Date deadline) throws InterruptedException;

       void signal();

       void signalAll();

}

 

Condition 클래스는 내부적으로 하나의 Lock 클래스를 사용해 동기화를 맞춘다. Condition 인스턴스를 생성하려면 Lock.newCondition 메소드를 호출한다.

 

위험성 경고 : Condition 객체 사용시 await, signal, signalAll 을 사용해야 하며 Object 에서 상속받는 wait, notify, notifyAll 사용시 동기화 기능에 큰 문제가 생길 수 있다.

 

protected final Lock lock = new ReentrantLock();

// 조건 서술어 : notFull (count < items.length)

private final Condition notFull = lock.newCondition();

// 조건 서술어 : notEmpty (count > 0)

private final Condition notEmpty = lock.newCondition();

 

@GuardedBy("lock") private final T[] items = (T[]) new Object(BUFFER_SIZE);

@GuardedBy("lock") private int tail, head, count;

 

// 만족할 때까지 대기 : notFull

public void put(T x) throws InterruptedException{

       lock.lock();

       try{

             while(count==items.length)

                    notFull.await();

             items[tail] = x;

             if(++tail == items.length)

                    tail = 0;

             ++count

             notEmpty.signal();

       }finally{

             lock.unlock();

       }

}

 

// 만족할 때까지 대기 : notEmpty

public T take() throws InterruptedException{

       lock.lock();

       try{

             while(count == 0)

                    notEmpty.await();

             T x = items[head];

             items[head] = null;

             if(++head==items.length)

                    head=0;

             --count;

             notFull.signal();

             return x;

       }finally{

             lock.unlock();

       }

}

 

위의 예는 조건별로 Condition 객체를 생성하여 처리한 코드이다.

 

조건별로 각각의 Conditino 객체를 생성해 사용하면 클래스 구조를 분석하기가 쉽다.  notifyAll 과 같은 signalAll 대신 그보다 더 효율적인 signal 메소드를 사용해 동일한 기능을 처리할 수 있으므로, 컨텍스트 스위치 횟수도 줄일 수 있고 버퍼의 기능이 동작하는 동안 각 스레드가 락을 확보하는 횟수 역시 줄일 수 있다.

 

조건에 관련되 모든 변수는 Lock의 보호 아래 동기화돼 있어야 하고, 조건을 확인하거나 await 또는 signal 메소드를 호출하는 시점에는 반드시 Lock 을 확보한 상태여야 한다.

 

 

14.4 동기화 클래스의 내부 구조

ReentrantLock, Semaphore, CountDownLatch, ReentrantReadLock, SynchronousQueue, FutureTask 는 모두 AbstractQueuedSynchronizer(AQS) 를 상속받아 구현돼 있다.

 

 

14.5 AbstractQueuedSynchronizer

AQS 기반의 동기화 클래스가 담당하는 작업 가운데 가장 기본이 되는 연산은 확보 acquire 와 해제 release 이다. 확보 연산은 상태기반으로 동작하여 항상 대기상태에 들어갈 가능성이 있다.

AQS  getState, setState, compareAndSetState 메소드를 사용해 동기화 클래스의 상태 변수를 관리할 수 있다.

배타적인 확보기능을 제공하는 동기화 클래스는 tryAcquire, tryReleas, isHeldExclusively 등의 메소드를 구현해야 하며, 배타적이지 않은 확보 기능을 지원하는 동기화 클래스는 tryAcquireShared, tryReleaseShared 메소드를 제공해야 한다.

위의 메소드는 acquire, acquireShared, release, releaseShared 메소드 내부에서 호출된다.

 

 

14.5.1 간단한 래치

 

@ThreadSafe

public class OneShotLatch {

       private final Sync sync = new Sync();

      

       public void signal() { sync.releaseShared(0);}

      

       public void await() throws InterruptedException {

              sync.acquireSharedInterruptibly(0);

       }

      

       private class Sync extends AbstractQueuedSynchronizer {

             protected int tryAcquireShared(int ignored){

                    // 래치가 열려 있는 상태(state==1)라면 성공, 아니면 실패

                    return (getState()==1) ? 1 : -1;

             }

            

             protected boolean tryReleaseShared(int ignored){

                    setState(1); // 래치가 열렸다.

                    return true; // 다른 스레드에서 확보 연산에 성공할 가능성이 있다.

             }

       }

}

 

OneShotLatch  await 메소드로 스레드를 대기하도록 한다음 signal 메소드로 대기하던 모든 스레드를 통과시키는 클래스이다.

AQS  releaseShared(int) 메소드는 내부적으로 tryAcquiredShared(int) 메소드를 호출하게 된다.  acquireSharedInterruptibly(int) 메소드는 내부적으로 tryAcquireShared(int)메소드를 호출한다. 따라서 AQS  기본으로 제공해주는 메소드를 사용하면 쉽게 동기화 클래스를 구현할  있다.

 

OneShotLatch  AQS 의 핵심 기능을 위임 delegate 하는 형식으로 구현되었다. AQS 를 상속하는 방법은 권장하지 않는다.

 

 

14.6 java.util.concurent 패키지의 동기화 클래스에서 AQS 활용 모습

java.util.concurrent 패키지에 있는 ReentrantLock, Semaphore, ReentrantReadWriteLock, CountDownLatch, SynchronousQueue, FutureTask 의 클래스는 AQS를 기반으로 구현돼어 있다.

 

 

14.6.1 ReentrantLock

 

protected boolean tryAcquire(int ignored){

       final Thread current = Thread.currentThread();

       int c = getState();

       if(c == 0){

             if(compareAndSetState(0,1)){

                    owner = current;

                    return ture;

             }

       }else if(current == owner){

             setState(c+1);

             return true;

       }

       return false;

}

 

ReentrantLock 은 배타적인 확보 연산만 제공하기 때문에 tryAcquire, tryRelease, isHeldExclusively 와 같은 메소드만 구현하고 있다. 또 동기화 상태 값을 확보된 락의 개수를 확인하는 데 사용한다.(재진입에 따른 락의 개수) owner 변수를 통해 락을 확보한 스레드를 관리한다. 락의 재진입 여부도 owner 변수를 통해 확인한다.

compareAndState(int, int) 기본의 설정 값과 비교, 상태 값 설정의 2개의 연산을 단일 연산으로 처리한다.

 

 

14.6.2 Semaphore CountDownLatch

 

protected int tryAcquireShared(int acquires){

       while(true){

             int available = getState();

             int remaining = available - acquire;

             if(remaining < 0

                           || compareAndSetState( available, remaining))

             return remaining;

       }

}

 

protected boolean tryReleaseShared(int release){

       while(true){

             int p = getState();

             if(compareAndSetState(p, p + release))

                    return true;

       }

}

Semaphore  AQS의 동기화 상태를 사용해 현재 남아 있는 퍼밋의 개수를 관리한다.

메소드 내부의 while 반복문은 충분한 개수의 퍼밋이 없거나 tryAcquireShared 메소드가 확보 연산의 결과로 퍼밋 개수를 단일 연산으로 변경할 수 있을 때까지 반복한다.

 

CoundDownLatch 클래스도 동기화 상태 값을 현재 개수로 사용하는, Semaphore와 비슷한 형태로 AQS를 활용한다.

 

 

14.6.3 FutureTask

FutureTask 는 내부의 작업이 완료되거나 취소되는 이벤트가 발생하면 해당 스레드가 계속 진행할 수 있고, 아니면 원하는 이벤트가 발생할 때까지 스레드가 대기 상태에 들어간다.

 

 

14.6.4 ReentrantReadWriteLock

ReentrantReadWriteLock 클래스는 AQS 클래스 하나로 읽기 작업과 쓰기 작업을 모두 담당한다.

상태변수의 32개 비트 가운데 16비트로는 쓰기 락에 대한 개수를 관리하고, 나머지 16비트로는 읽기 락의 개수를 관리한다. 읽기 락에 대한 기능은 독점적이지 않은 확보와 해제 연산으로 구현돼 있고, 쓰기 락에 대한 기능은 독점적인 확보와 해제 연산을 사용한다.