IT_Programming/Java

[펌] 진보된 쓰레드 풀링 기법 구현

JJun ™ 2010. 11. 30. 17:13

-------------------------------------------------------------------------------------------------

                                                                      출처: http://javacan.tistory.com/12

-------------------------------------------------------------------------------------------------

 

상황에 따라 쓰레드의 개수를 동적으로 증감시키는 쓰레드 풀링을 구현해본다.

쓰레드 풀링의 필요성

자바의 특징 중의 하나를 꼽으라면 언어 차원에서 제공하는 멀티 쓰레드 기능이다.

쓰레드는 프로세스에 비해 다음과 같은 특징을 갖고 있다.

  • 한 개의 쓰레드를 생성하는 데 드는 비용(시간+자원)은 한 개의 프로세스를 생성하는 데 드는 비용보다 적다.
  • 멀티 프로세스를 스케쥴링하는 것 보다 멀티 쓰레드를 스케쥴링 하는 것이 더 간단하다.
  • 쓰레드는 하나의 프로세스안에서 수행되므로 처음부터 공유 메모리를 갖지만, 멀티 프로세스 사이에 메모리를 공유하기 위해서는 공유 메모리를 별도로 생성해야 한다.

프로세스에 비해 쓰레드가 갖는 이러한 장점들 때문에 쓰레드의 사용은 점차 일반화되어 가고 있으며,

자바에서 쓰레드의 사용은 필수적인 요소가 되어 가고 있다. (실제로 앞으로 나올 아파치 웹서버 2.0은

프로세스 단위가 아닌 쓰레드 단위로 사용자의 요청을 처리하고 있다.)

 

비록 쓰레드가 프로세스에 비해 자원의 사용량이나 성능 면에서 뛰어난 것이 사실이지만, 쓰레드를 사용하는데는 일정량의 시간과 자원이 소비되며 따라서 사용할 수 있는 쓰레드의 개수에는 제한이 있기 마련이다.

또한, 동시에 수행되는 쓰레드의 개수가 많아질수록 쓰레드 스케쥴링하는 데 소비되는 시간은 많이지게 되며, 이는 결국 어플리케이션의 성능을 저하시키는 요인이 되기도 한다.

이러한 문제점을 해결하기 위해 사용되는 것이 바로 쓰레드 풀링이다.

일반적으로 쓰레드 풀링은 다음과 같은 특징을 갖는다.

  • 미리 생성되어 있는 쓰레드를 재사용함으로써 쓰레드를 생성하는 데 소비되는 시간을 줄인다.
  • 동시에 생성될 수 있는 쓰레드의 개수를 제한함으로써 시스템의 자원 소비량을 제한한다.

여기서 두번째 특징은 어플리케이션의 전체 성능을 일정 수준으로 유지하는 데 중요한 역할을 한다.

예를 들어, 웹 서버를 구현한다고 하자. 이 때 클라이언트가 문서를 요청할 때 마다 하나의 쓰레드를

생성하여 그 요청을 처리하도록 구현할 수 있다. 이 경우 동시에 100명이 요청을 하게 되면 100개의

쓰레드를 동시에 생성하게 된다. 만약 한 개의 쓰레드를 생성하는 데 소비되는 메모리가 200K 라면,

한 순간에 20000K (약 20M)의 메모리가 소모되는 것이다. 메모리 뿐만 아니라 100개의 쓰레드를

스케쥴링 하기 위해서 많인 시간을 소비하게 된다. 만약 동시 접속자수가 1000명인 사이트라면

한 순간에 200M의 메모리를 사용하게 된다.

 

문제는 생성된 쓰레드가 클라이언트의 요청을 처리하면 끝나고, 이후에 1000명이 서비스를 요청하면 또 다시 1000개의 쓰레드를 생성한다는 점이다. 즉, 또 다시 200M의 메모리가 필요하게 되는 것이다. 어플리케이션의 전체적인 성능이 저하될 것은 불보듯 뻔하다. 심지어 시스템이 다운되는 경우도 발생한다.

쓰레드 풀링을 사용하게 되면, 이러한 문제가 어느 정도 해결된다. 만약 동시에 수행될 수 있는 쓰레드의 개수를 100개로 제한했다고 해 보자. 이 경우 1000개의 클라이언트가 서비스를 요청한다고 해도 100개의 쓰레드만이 동시에 수행되며, 따라서 메모리 사용량은 20M 정도로 유지된다. 물론, 1000 명의 클라이언트를 동시에

처리하지 않기 때문에 평균적인 응답 시간은 길어지겠지만 어플리케이션의 안정성은 매우 크게 향상된다.

 

또한 동시 사용자 수가 많지 않다면, 이미 생성되어 있는 쓰레드를 사용하기 때문에 평균적인 응답 시간은

빨라지게 된다.



쓰레드 풀의 구현

앞 부분을 통해서 쓰레드 풀링이 왜 필요한지 알게 되었을 것이다. 이제부터는 쓰레드 풀링의 구현에 대해서 알아보자. 일반적으로 책이나 인터넷 상에 공개된 쓰레드 풀링 클래스는 필요에 따라 동시에 수행되는

쓰레드의 개수를 증가시키는 경우는 있어도, 필요한 것에 비해 많은 쓰레드가 수행될 경우 쓰레드의 개수를

감소시키는 경우는 거의 없는 것 같다. 그래서 이 글에서는 간단하게 쓰레드의 개수를 상황에 따라 감소 시키는 쓰레드 풀링을 구현할 것이다. 이 글에서 구현할 쓰레드 풀링은 WorkQueue 클래스, ThreadPool 클래스와 PooledThread 클래스 그리고 AleadyClosedException 클래스로 구성되어 있다. 여기서 PooledThread 클래스는 ThreadPool 클래스의 이너(inner) 클래스로서 실제 수행되는 쓰레드에 해당한다.



WorkQueue 클래스

WorkQueue 클래스는 쓰레드 풀 속에 있는 쓰레드들이 수행할 작업을 저장하고 있는 큐이다.

클래스의 이름에서도 알 수 있듯이 이 클래스는 Queue의 역할을 한다. enqueue() 메소드를 사용하여

수행해야 할 작업을 큐에 저장하며, dequeue() 메소드를 사용하여 수행할 작업을 큐로부터 읽어올 수 있다.

또한, close() 메소드를 호출함으로써 WorkQueue의 사용을 끝내도록 하고 있다.

 

WorkQueue 클래스의 소스 코드는 다음과 같다.

소스 코드는 자체는 매우 간단하므로 특별한 설명은 하지 않겠다.

package javacan.thread.pool;

import java.util.LinkedList;

/**
 * 쓰레드가 수행할 작업을 저장하는 큐.
 *
 * @author 최범균, madvirus@tpage.com
 */
public class WorkQueue {
   
   /**
    * 쓰레드가 수행할 작업을 저장한다.
    */
   private LinkedList workList = new LinkedList();
   
   /**
    * WorkQueue의 사용이 끝났는지의 여부를 나타낸다.
    */
   private boolean closed = false;
   
   /**
    * 큐에 새로운 작업을 삽입한다.
    */
   public synchronized void enqueue(Runnable work) 
          throws AleadyClosedException {
      if (closed) {
         throw new AleadyClosedException();
      }
      workList.addLast(work);
      notify();
   }
   
   /** 
    * 큐에 저장된 작업을 읽어온다.
    */
   public synchronized Runnable dequeue() 
          throws AleadyClosedException, InterruptedException {
      while( workList.size() <= 0 ) {
         wait();
         if ( closed ) {
            throw new AleadyClosedException();
         }
      }
      return (Runnable)workList.removeFirst();
   }
   
   public synchronized int size() {
      return workList.size();
   }
   
   public synchronized boolean isEmpty() {
      return workList.size() == 0;
   }
   
   public synchronized void close() {
      closed = true;
      notifyAll();
   }
}


위 코드에서 큐를 구현하기 위해 java.util.LinkedList 클래스를 사용한 이유는 Vector에서 원소를 삭제할 때 발생하는 성능 문제가 LinkedList 클래스에서는 발생하지 않기 때문이다. Vector의 성능 문제에 대한 문제는 JavaCan의 또 다른 기사인 "자바 어플리케이션 성능 향상"을 참조하기 바란다.

 


ThreadPool 클래스

이제 WorkQueue 클래스를 이용하여 쓰레드 풀링을 구현한 javacan.thread.pool.ThreadPool 클래스에서

대해서 살펴보자. 먼저 ThreadPool 클래스의 소스 코드부터 살펴보자.

package javacan.thread.pool;

/**
 * 실제로 사용되는 쓰레드 풀 클래스.
 * 내부적으로 WorkQueue를 이용하여 쓰레드가 수행해야 할 작업을 저장한다.
 *
 * @author 최범균, era13@hanmail.net
 */
public class ThreadPool extends ThreadGroup {
   public static final int DEAFULT_MAX_THREAD_COUNT = 30;
   public static final int DEAFULT_MIN_THREAD_COUNT = 0;
   public static final int DEFAULT_INITIAL_THREAD_COUNT = 10;
   
   /**
    * 허용되는 idel 쓰레드의 개수
    */
   public static final int DEFAULT_ALLOWED_IDLE_COUNT = 5;
   
   /**
    * 수행할 작업을 저장한다.
    */
   private WorkQueue pool = new WorkQueue();
   
   /**
    * 최소한 생성되어 있어야 할 쓰레드의 개수
    */
   private int minThreadCount;
   
   /**
    * 최대로 생성할 수 있는 쓰레드의 개수
    */
   private int maxThreadCount;
   
   /**
    * 현재 생성되어 있는 쓰레드의 개수
    */
   private int createdThreadCount = 0;
   
   /**
    * 현재 실제 작업을 수행하고 있는 쓰레드의 개수
    */
   private int workThreadCount = 0;
   
   /**
    * 현재 작업을 수행하고 있지 않은 쓰레드의 개수
    * idleThreadCount = createdThreadCount - workThreadCount
    */
   private int idleThreadCount = 0;
   
   /**
    * 풀에서 허용되는 idle 쓰레드의 개수
    */
   private int allowedIdleCount = 0;
   
   /**
    * 쓰레드 풀이 닫혔있는지의 여부
    */
   private boolean closed = false;
   
   private static int groupId = 0;
   private static int threadId = 0;
   
   /**
    * ThreadPool을 생성한다.
    * @param initThreadCount 초기에 생성할 쓰레드 개수
    * @param maxThreadCount 생성할 수 있는 최대 쓰레드 개수
    * @param minThreadCount 최소한 생성되어 있어야 할 쓰레드의 개수
    * @param allowedIdleCount 풀에서 허용되는 Idle 쓰레드의 개수
    */
   public ThreadPool(int initThreadCount, int maxThreadCount,
                     int minThreadCount, int allowedIdleCount) {
      super(ThreadPool.class.getName()+Integer.toString(groupId++) );
      
      if (minThreadCount < 0) minThreadCount = 0; // 최소 쓰레드 개수 검사
      if (initThreadCount < minThreadCount)
         initThreadCount = minThreadCount; // 초기 쓰레드 개수 검사
      if (maxThreadCount < minThreadCount 
         || maxThreadCount < initThreadCount)
            maxThreadCount = Integer.MAX_VALUE; // 최대 쓰레드 개수 검사
      
      if (allowedIdleCount < 0) allowedIdleCount = DEFAULT_ALLOWED_IDLE_COUNT;
      
      this.minThreadCount = minThreadCount;
      this.maxThreadCount = maxThreadCount;
      this.createdThreadCount = initThreadCount;
      this.idleThreadCount = initThreadCount;
      this.allowedIdleCount = allowedIdleCount;
      
      for (int i = 0 ; i < this.createdThreadCount ; i++ ) {
         new PooledThread().start();
      }
   }
   
   public ThreadPool(int initThreadCount, int maxThreadCount, int minThreadCount) {
      this(initThreadCount, maxThreadCount, minThreadCount, DEFAULT_ALLOWED_IDLE_COUNT);
   }
   
   /**
    * 큐에 작업할 객체를 삽입한다.
    *
    * @work 쓰레드가 수행할 작업
    */
   public synchronized void execute(Runnable work) throws AleadyClosedException {
      if (closed) throw new AleadyClosedException();
      
      // 현재 상태 파악 후, 필요하다면 쓰레드 개수를 증가시킨다.
      increasePooledThread();
      pool.enqueue( work );
   }
   
   /**
    * 쓰레드 풀을 종료한다.
    */
   public synchronized void close() throws AleadyClosedException {
      if (closed) throw new AleadyClosedException();
      closed = true;
      pool.close();
   }
   
   /**
    * 필요하다면 PooledThread의 개수를 증가한다.
    */
   private void increasePooledThread() {
      synchronized(pool) {
         // 수행해야 할 작업의 개수가 놀고 있는 쓰레드 개수보다 많다면,
         // 그 차이만큼 쓰레드를 생성한다.
         if (idleThreadCount == 0 && createdThreadCount < maxThreadCount) {
            new PooledThread().start();
            createdThreadCount ++;
            idleThreadCount ++;
         }
      }
   }
   
   private void beginRun() {
      synchronized(pool) {
         workThreadCount ++;
         idleThreadCount --;
      }
   }
   
   /**
    * 쓰레드를 종료할 지의 여부를 나타낸다.
    * @return 쓰레드가 계속 수행해야 하는 경우 false를 리턴,
    *         쓰레드를 종료하고자 할 경우 true를 리턴.
    */
   private boolean terminate() {
      synchronized(pool) {
         workThreadCount --;
         idleThreadCount ++;
         
         if (idleThreadCount > allowedIdleCount && createdThreadCount > minThreadCount) {
            // idle 쓰레드의 개수가 10개를 넘기고, 
            // 현재 생성되어 있는 쓰레드의 개수가 minThreadCount 보다 큰 경우
            createdThreadCount --;
            idleThreadCount --;
            
            return true;
         }
         return false;
      }
   }
   
   /**
    * 큐로부터 작업(Runnable 인스턴스)을 읽어와 run() 메소드를 수행하는 쓰레드
    */
   private class PooledThread extends Thread {
      
      public PooledThread() {
         super(ThreadPool.this, "PooledThread #"+threadId++);
      }
      
      public void run() {
         try {
            while( !closed ) {
               Runnable work = pool.dequeue();
               
               beginRun();
               work.run();
               if (terminate() ) {
                  break; // ← idle 쓰레드의 개수가 많을 경우 쓰레드 종료
               }
            }
         } catch(AleadyClosedException ex) {            
         } catch(InterruptedException ex) {            
         }
      }
   } // end of PooledThread
   
   public void printStatus() {
      synchronized(pool) {
         System.out.println("Total Thread="+createdThreadCount);
         System.out.println("Idle  Thread="+idleThreadCount);
         System.out.println("Work  Thread="+workThreadCount);
      }
   }
}


먼저 WorkQueue 클래스의 인스턴스를 나타내는 pool 필드가 정의된 것을 알 수 있다.

또한, 필드로는 int 타입인 maxThreadCount, minThreadCount, createdThreadCount, workThreadCount, idleThreadCount, allowedIdleCount가 있다. 이 필드들은 각각 차례대로 최대로 생성될 수 있는 쓰레드의

개수, 최소한 생성되어 있어야 할 쓰레드의 개수, 현재 생성되어 있는 쓰레드의 개수, 현재 작업을 수행하고 있는 쓰레드의 개수, 현재 작업을 수행하고 있지 않은 쓰레드의 개수, 허용되는 쉬는 쓰레드의 개수이다.

 

ThreadPool 클래스의 생성자는 이 값들을 초기화한 후, 생성자에서 지정한 개수만큼의 PooledThread

클래스를 생성한 후, PooledThread의 start() 메소드를 호출한다.

PooledThread 클래스는 풀 속에 저장된 쓰레드로서 사용자가 ThreadPool.execute(Runnable) 메소드를

통해서 큐에 저장한 인스턴스를 읽어와 그 인스턴스의 run() 메소드를 호출한다. PooledThread 클래스의

run() 메소드에서 이러한 작업이 이루어진다. run() 메소드를 보면 beginRun()과 terminate() 메소드를

호출하는 것을 알 수 있는 데, 이 두 메소드는 idleThreadCount와 workThreadCount의 값을 알맞게 조절한다. 또한, terminate() 메소드는 쓰레드를 계속해서 수행할지의 여부를 결정한다. 만약 이 메소드가 true를

리턴하게 되면, 해당 쓰레드는 더 이상 작업을 수행하지 않고 종료하게 된다.

ThreadPool 클래스의 execute() 메소드를 살펴보면 pool.enqueue() 메소드를 사용하여

Runnable 인스턴스를 WorkQueue에 삽입하기 전에 increasePooledThread() 메소드를 호출하여,

필요할 경우 또 다른 PooledThread를 생성하여 동시에 수행되는 쓰레드의 개수를 증가시킨다.

 

쓰레드 풀링의 구현이 약간 이해가 안 될지도 모르지만, PooledThread가 쓰레드라는 점과 PooleThread의 개수를 조절하는 것이 ThreadPool 클래스라는 점을 유념하면서 소스 코드를 분석해보면 그리 어렵지 않게

이해할 수 있을 것이다.


실제로 ThreadPool 클래스의 사용은 다음과 같이 매우 간단하다.

ThreadPool pool = new ThreadPool(5, 40, 0, 5);
pool.execute(  ...  );
pool.execute(  ...  );
.....
pool.close();

 

 


AleadyClosedException 클래스

AleadyClosedException 클래스는 WorkQueue 클래스와 ThreadPool 클래스가 닫힌 상태에 있을 때 enqueue()나 dequeue() 또는 execute() 메소드 등을 호출할 때 발생하는 예외 클래스이다.

이 클래스의 소스 코드는 다음과 같다.

package javacan.thread.pool;

/**
 * WorkQueue 클래스의 enqueue(Runnable work) 메소드와 dequeue() 메소드를 호출할 때,
 * 이미 WorkQueue가 닫힌 상태일 경우 발생한다.
 *
 * 또한, ThreadPool 클래스의 
 *
 */
public class AleadyClosedException extends Exception {

   public AleadyClosedException(String msg) {
      super(msg);
   }
   
   public AleadyClosedException() {
      super();
   }
}

 

 


ThreadPool 클래스의 사용

실제로 ThreadPool 클래스를 사용하는 간단한 예제 어플리케이션인 TestPool 클래스를 살펴보자.

TestPool 클래스의 소스 코드는 다음과 같다.

import javacan.thread.pool.*;

public class TestPool {
   static int count = 0;
   static long sleepTime = 0;
   static ThreadPool pool = null;
   
   public static void main(String[] args) {
      pool = new ThreadPool(Integer.parseInt(args[0]),  // 초기 생성
                    Integer.parseInt(args[1]),  // max
                    Integer.parseInt(args[2]),  // min
                    Integer.parseInt(args[3]) ); // 허용되는 idle 개수
      sleepTime = Long.parseLong(args[4]);
      
      try {
         for (int i = 0 ; i < 15 ; i ++ ) {
            pool.execute(new Runnable() {
               public void run() {
                  pool.printStatus();
                  int local = count++;
                  
                  try {
                     Thread.currentThread().sleep( sleepTime );
                     System.out.println("Test "+local);
                  } catch(Exception ex) {
                     ex.printStackTrace();
                  }
               }
            } );
            try {
               Thread.currentThread().sleep(10);
            } catch(Exception ex) {}
         }
         try {
            Thread.currentThread().sleep(10000);
         } catch(Exception ex) {}
         
         pool.close();
      } catch(AleadyClosedException ex) {
         ex.printStackTrace();
      }
   }
}


이 클래스를 수행해보면 ThreadPool이 생성한 전체 쓰레드의 개수, 쉬는 쓰레드의 개수 등이 어떻게

변경되는 지 관찰할 수 있을 것이다.

 


결론

ThreadPool 클래스는 쓰레드 풀링을 제공함으로써 전체적인 어플리케이션의 성능을 향상시켜줄 뿐만 아니라 상황에 따라 알맞게 풀 속에서 수행되는 쓰레드의 개수를 증감시킴으로써 효율적으로 자원을 사용할 수

있도록 해준다. 여러분이 작성할 어플리케이션에서 알맞게 ThreadPool 클래스를 수정한다면 좀 더 좋은 기능을 제공해주는 쓰레드 풀링을 사용할 수 있을 것이다.


관련링크: