IT_Programming/Dev Libs & Framework

Akka를 이용한 Concurrent 프로그래밍 시작하기

JJun ™ 2011. 4. 3. 13:51

-----------------------------------------------------------------------------------------------

출처: http://javacan.tistory.com/entry/akka-1-start

-----------------------------------------------------------------------------------------------

 

Akka 첫 번째, Akka를 이용한 Concurrent 프로그래밍 시작하기

 

개인적으로 관심을 가지고 지켜보던 Akka 프로젝트가 1.0 버전이 되었다. 평소에 병행 처리와 분산 처리에 관심이 많았는데, Akka는 이를 보다 쉽게 구현할 수 있도록 도와주는 프로젝트이다. 본 글에서는 Akka가 무엇인지 간단하게 설명하고 실제 Akka를 이용해서 액터를 생성하고 실행하는 방법을 살펴볼 것이다.


Akka란?

Akka는 병행(concurrent) 및 분산 처리를 위한 오픈 소스 프로젝트로서 액터(Actor) 모델을 이용하고 있다. 필자가 액터 모델 자체에 대한 이해가 완전하지 않지만, 액터 모델을 간단하게 설명하면 다음의 특징을 갖는다.

  • 액터들은 상태를 공유하지 않는다.
  • 액터들 간의 통신은 메시지 전달을 통해서 이루어진다. (이벤트 기반 모델)
  • 액터간의 통신은 비동기로 이루어진다.
  • 각 액터는 전달받은 메시지를 큐에 보관하며, 메시지를 순차적으로 처리한다.
  • 액터는 일종의 경량 프로세서다.
위와 같은 특징은 병행 처리 코드를 보다 쉽게 구현할 수 있도록 도와준다. 실제로 다중 쓰레드 프로그래밍을 해 본 개발자 중에서는 올바르지 못한 동기화 처리로 쓰레드 블럭킹 되는 등의 문제로 고생한 경험이 한 두 번씩은 존재할 것이다. 액터는 애초에 데이터를 서로 공유하지 않는 것을 원칙으로 하기 때문에, 데드락이나 락에 대한 고민을 줄여주고 병행 처리 그 자체에 집중할 수 있도록 도와준다.

Akka는 액터 모델을 통해서 병행 처리를 쉽게 할 수 있도록 도와줄 뿐만 아니라, 리모트 노드에 존재하는 액터를 마치 로컬에 존재하는 액터처럼 사용할 수 있도록 해 주고 있다. 개발자는 통신 프로토콜에 대해 고민할 필요 없이 리모트 액터를 사용할 수 있기 때문에 분산 처리 코드를 손쉽게 작성할 수 있게 된다.

Akka는 스카라(Scala)와 자바(Java)의 두 언어에 대한 API를 제공하고 있는데, 필자가 스카라 언어 자체에 대해서는 아직 잘 모르고 국내에서도 스카라 언어에 대한 관심이 적은 관계로 자바 API를 기준으로 Akka 사용법을 살펴볼 것이다.

Akka 코어 사용

Akka를 사용하려면 http://akka.io 사이트에서 필요한 파일을 다운로드 받으면 되는데, Maven을 사용하고 있다면 pom.xml 파일에 다음과 같이 리포지토리 설정과 의존 설정을 추가해주면 Akka가 제공하는 액터 기능을 사용할 수 있다.

<repositories>
    <repository>
        <id>Akka</id>
        <name>Akka Maven2 Repository</name>
        <url>http://akka.io/repository/</url>
    </repository>

    <repository>
        <id>Multiverse</id>
        <name>Multiverse Maven2 Repository</name>
        <url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
    </repository>

    <repository>
        <id>GuiceyFruit</id>
        <name>GuiceyFruit Maven2 Repository</name>
        <url>http://guiceyfruit.googlecode.com/svn/repo/releases/</url>
    </repository>

    <repository>
        <id>JBoss</id>
        <name>JBoss Maven2 Repository</name>
        <url>http://repository.jboss.org/nexus/content/groups/public/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>se.scalablesolutions.akka</groupId>
        <artifactId>akka-actor</artifactId>
        <version>1.0</version>
    </dependency>
</dependencies>

[주의]
Akka는 Scala로 만들어졌기 때문에, 이클립스에서 Akka 모듈을 사용할 때 코드 어시스트가 제대로 동작하려면 Scala-IDE 등을 이용해서 개발환경을 구축해 주어야 한다. 그렇지 않으면, 코드 어시스트가 안 되서 짜증나는 시간을 보내게 될 것이다.

액터 클래스 및 사용

액터를 생성하고 사용하려면, 먼저 Akka가 제공하는 기반 클래스를 이용해서 액터 역할을 수행할 클래스를 구현해 주어야 한다. Akka는 akka.actor.UntypedActor 클래스를 제공하고 있으며, 이 클래스를 상속받아서 액터를 구현할 수 있다.

import akka.actor.UntypedActor;

public class PrintActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        Thread.sleep(500); // 테스트를 위해 0.5초 sleep
        System.out.println(message);
    }

}

UntypedActor를 상속받은 클래스는 onReceive() 메서드를 구현해 주어야 하는데, onReceive() 메서드는 액터에 전달된 메시지를 처리하게 된다.

액터 클래스를 만들었다면, 액터를 생성한 뒤에 액터에 메시지를 전달할 수 있다. 액터를 생성할 때에는 akka.actor.Actors 클래스의 static 메서드인 actorOf() 메서드를 사용한다. Actors.actorOf() 메서드는 액터를 구현한 클래스의 Class를 전달받으며, 액터를 생성한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
// actor를 이용해서 액터에 메시지를 전달

ActorRef의 start() 메서드는 액터를 시작하며, 액터가 시작된 이후부터 액터에 메시지를 전달할 수 있게 된다.

액터에 메시지 전달하기

Actors.actorOf()를 이용해서 액터를 생성했다면, 이후 ActorRef가 제공하는 메서드를 이용해서 액터에 메시지를 전달할 수 있다. 다음의 세 가지 방법으로 액터에 메시지를 전달할 수 있다.
  • Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.
  • Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.
  • Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.
sendOneWay() 메서드를 이용한 Fire-And-Forget 방식 메시지 전달

ActorRef.sendOneWay() 메서드는 메시지를 액터에 전달할 때 사용된다. sendOneWay()라는 이름에서 알 수 있듯이 이 메서드는 액터로부터 어떤 값도 받지 않으며, 액터로부터 응답을 기다리지 않고 곧 바로 리턴한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
actor.sendOneWay("받아라"); // actor에 "받아라" 메시지를 전달하고 바로 리턴.
actor.sendOneWay("받아라2");
actor.sendOneWay("받아라3");
System.out.println("비동기로 실행");

메시지를 받은 액터는 내부적으로 사용하는 큐에 메시지를 보관한 뒤, 차례대로 액터의 onReceive(Object message) 메서드에 메시지를 전달한다. PrintActor의 onReceive() 메서드는 0.5초후에 전달받은 메시지를 출력하고 sendOneWay()메서드는 응답 대기 없이 바로 리턴하므로, 위 코드가 실행되면 콘솔에는 다음과 같은 순서로 문자열이 출력된다.

비동기로 실행
받아라
받아라2
받아라3

sendOneWay() 메서드는 다음의 두 가지를 제공된다.
  • sendOneWay(Object message)
  • sendOneWay(Object message, ActorRef sender) : 메시지를 전송하면서 메시지를 보낸 액터로 sender를 지정한다.

sendRequestReply() 메서드를 이용한 Send-And-Receive-Eventually 방식 메시지 전달

ActorRef.sendRequestReply() 메서드는 액터에 메시지를 전달하고, 그 메시지에 대한 응답이 올 때 까지 대기하고 싶을 때 사용된다. 액터 구현 클래스는 getContext().replyUnsafe() 메서드를 이용해서 메시지에 대해 응답할 수 있는데, ActorRef.sendRequestReply() 메서드는 이 응답을 리턴하게 된다. 예를 들어, 다음과 같이 메시지에 대해 응답하는 액터가 있다고 하자.

public class PingActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        getContext().replyUnsafe("응답: "+ message); // 메시지 sender에 응답
    }

}

이 경우 다음과 같이 sendRequestReply() 메서드를 이용함으로써 액터에 전달한 메시지에 대한 응답이 도착할 때 까지 대기할 수 있다.

ActorRef actor = Actors.actorOf(PingActor.class);
actor.start();
Object res = actor.sendRequestReply("헬로우"); // 액터로부터 응답이 도착할 때 까지 대기

sendRequestReply() 메서드는 일정 시간 동안 액터로부터 응답이 없을 경우 akka.actor.ActorTimeoutException 예외를 발생시킨다. 별도 설정을 하지 않은 경우 기본 타입 시간은 5초이며, sendRequestReply() 메서드를 호출할 때 타임아웃을 지정할 수도 있다.

Object res = actor.sendRequestReply("헬로우", 1000, null); // 1초간 응답 대기

sendRequestReply() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReply(Object message)
  • sendRequestReply(Object message, ActorRef sender): 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReply(Object message, long timeout, ActorRef sender)

sendRequestReplyFuture() 메서드를 이용한 Send-And-Receive-Future 방식 메시지 전달

sendRequestReplyFuture() 메서드는 메시지를 전달한 뒤 응답을 받기 위한 Future를 리턴한다. Future는 자바가 제공하는 Future가 아닌 Akka가 제공하는 akka.dispatch.Future 타입이다. Future는 주로 다음과 같은 형식으로 주로 사용된다.

Future future = actor.sendRequestReplyFuture("하이");
future.await(); // 응답을 대기. 대기 시간을 초과하면 예외 발생
if (future.isCompleted()) { // 완료되었다면
    Option resultOption = future.result(); // 응답 구함
    if (resultOption.isDefined()) { // 응답 데이터가 있다면,
        Object result = resultOption.get(); // 응답 데이터 구함
        System.out.println(result);
    }
}

sendRequestReplyFuture()가 리턴한 Future의 await() 메서드는 시간이 초과될 때 까지 대기한다. 시간이 초과되기 전에 응답이 도착하면 다음으로 넘어가고, 시간이 초과되면 ActorTimeoutException 예외를 발생시킨다.

sendRequestReplyFuture() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReplyFuture(Object message)
  • sendRequestReplyFuture(Object message, ActorRef sender) : 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReplyFuture(Object message, long timeout, ActorRef sender)
두 개 이상의 액터에 메시지를 전달한 후 액터로부터의 응답이 모두 도착할 때 까지 대기해야 한다면, Futures.awaitAll(Future[] futures) 메서드를 사용하면 된다.

// actor들에 메시지 전달(작업 전달)
for (ActorRef actor : actors) {
    futureList.add(actor.sendRequestReplyFuture(someWork);
}
Future[] futures =  futureList.toArray();
Future.awaitAll(futures); // 모든 액터로부터 응답이 (작업 결과가) 올 때 까지 대기.
// futures로부터 응답 구해서 처리


액터에서 메시지 받아 처리하기

ActorRef의 send*() 메서드를 통해서 전달된 메시지는 UntypedActor 클랫를 상속받은 액터 구현 클래스의 onReceive(Object message)에 차례대로 전달된다. (TypedActor를 사용하면 인터페이스를 이용해서 메시지를 전달받을 메서드를 정의할 수도 있는데, 이에 대한 내용은 다음에 살펴볼 것이다.)

onReceive() 메서드는 다음과 같이 메시지의 타입을 확인한 뒤 메시지 타입에 맞는 동작을 수행하도록 구현하는 것이 보통이다.

public class ActorImpl extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            // 메시지 처리
        }
        ...
    }
}


메시지에 응답하기

replyUnsafe()/replySafe()를 이용한 응답

액터 구현 클래스는 getContext()를 이용해서 해당 액터에 대한 ActorRef를 구할 수 있는데, ActorRef의 replyUnsafe() 또는 replySafe() 메서드를 이용해서 메시지에 대한 응답을 전달할 수 있다. replyUnsafe() 메서드는 응답 실패시 예외를 발생시키는 반면에 replySafe() 메서드는 응답에 실패할 경우 false를 리턴한다.

public class PingActor extends UntypedActor {

    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (! getContext.replySafe("pong")) {
                // 실패에 대한 처리
            }
        }
    }
}

액터에 메시지를 전달할 때 sendRequestReply() 메서드나 sendRequestReplyFuture() 메서드를 사용한 경우, replyUnsafe()와 replySafe()를 이용해서 응답한 데이터를 리턴 값으로 받게 된다.
 
메시지를 전달한 액터에 메시지로 응답하기

액터가 다른 액터에게 메시지를 전달하기도 한다. 이때 sendRequestReply*() 메서드에 대한 응답이 아니라 메시지를 전달한 액터에 메시지를 전달하는 방법으로 응답할 수도 있을 것이다. 이렇게 메시지를 전달한 액터에 응답으로 메시지를 전송하고 싶다면, getContext().getSender() 메서드를 이용해서 메시지를 보낸 액터에 대한 ActorRef를 구한 뒤, 그 ActorRef의 send*() 메서드를 이용해서 응답을 전달하면 된다.

public class PongActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (getContext().getSender().isDefined()) {
                ActorRef sender = getContext().getSender().get();
                sender.sendOneWay("pong", getContext());
            } else {
                getContext().replyUnsafe("pong");
            }
        }
    }
}


액터의 라이프사이클

액터의 라이프 사이클은 다음과 같다.
  • NEW: 액터가 만들어졌을 때. 메시지를 수신하지 못한다.
  • STARTED: start()가 호출되었을 때. 메시지를 수신할 수 있다.
  • SHUTDOWN: exit()나 stop()이 호출되었을 때. 어떤 것도 하지 못한다.
ActorRef는 start(), stop() 메서드를 제공하고 있으며, 이들 메서드를 이용해서 액터를 시작하고, 중지할 수 있다. 아래 코드는 전형적인 액터의 사용방법을 보여주고 있다.

actor.start(); // 액터를 시작
// 필요한 만큼 액터에 메시지 전달
actor.sendOneWay(msg);
...
actor.stop(); // 액터 종료

start() 메서드는 액터와 메시지 큐를 시작하고, stop() 메서드는 액터의 디스패처와 메시지 큐를 포함한 액터를 종료시킨다.

모든 액터를 종료시키고 싶다면, 다음과 같은 코드를 사용하면 된다.

Actors.registry().shtudownAll();


UntypedActor 클래스의 라이프 사이클 관련 콜백 메서드

UntypedActor 클래스는 라이프 사이클과 관련해서 시작/중지 이벤트를 처리할 수 있는 콜백 메서드를 제공하고 있다.
  • preStart(): 액터 시작 전에 호출된다.
  • postStop(): 액터 종료 후에 호출된다.
  • preRestart(Throwable reason): 액터 재시작 전에 호출된다. (무정지 액터 기능과 관련됨)
  • postRestart(Throwable reason): 액터 재시작 후에 호출된다. (무정지 액터 기능과 관련됨)

설정 파일 지정 및 기본 값

Akka는 다음의 세 가지 방법 중 한가지를 이용해서 설정 파일을 찾는다.
  • akka.config 시스템 프로퍼티로 지정한 파일 (java -Dakka.config=... )
  • 클래스패스에 위치한 akka.config 파일
  • AKKA_HOME 환경변수 존재 시, '$AKKA_HOME/config 디렉터리의 설정 파일 사용. (또는 akka.home 시스템 프로퍼티를 AKKA_HOME 환경 변수 대신 사용)
각 설정 정보 및 기본 값은 http://doc.akka.io/configuration 참고하기 바란다.

참고자료

 

 


 

 

Akka 두 번째, 리모트 액터 사용하기

Akka가 관심을 끄는 이유는 사실 액터 모델 자체보다는 리모트 노드에 위치한 액터를 마치 로컬에 위치한 액터처럼 사용할 수 있다는 것이었다. Scala 언어가 자체적으로 액터를 제공하고 있지만, Akka의 액터는 이 액터 모델을 리모트까지 확장했기 때문에, Akka를 사용하면 한 노드에서의 병행 처리 뿐만 아니라 다수 노드에서의 병행 처리까지 쉽게 구현할 수 있다.


리모트 액터를 사용하기 위한 과정

리모트 액터를 사용하려면 다음의 과정을 거치면 된다.
  1. 리모트 서버를 만든다. 리모트 서버는 리모트로 제공될 액터를 관리하며, 클라이언트는 리모트 서버에 연결해서 리모트로 제공되는 액터를 사용하게 된다.
  2. 리모트 서버에 액터 등록하기 (클라이언트에서 액터 등록하기, 서버에서 액터 등록하기)

[주의]
클라이언트와 서버는 모두 액터에서 사용되는 클래스를 갖고 있어야 한다. 이후 버전에서는 클라이언트와 서버간의 코드 제공 기능이 포함될 거라고 한다.


단계1, 리모드 액터를 실행할 리모트 서버 만들기

액터를 외부 노드에 제공하고 싶다면, 먼저 클라이언트와의 연결을 처리할 서버를 생성해 주어야 한다. 서버는 다음의 코드를 이용해서 리모트 액터를 실행할 서버를 생성할 수 있다.

Actors.remote().start("0.0.0.0", 2552); // 모든 호스트에 대해 2552 포트로 들어오는 요청 처리


Actors.remote().start("localhost", 2553); // 로컬 호스트의 2553 포트로 들어오는 요청 처리


Actors.remote().start(); // 설정 파일에 있는 기본 값 사용 (설정 파일 없을 시 기본값은 "localhost", 2552)


start() 메서드에서 호스트 값으로 "localhost"를 지정하면, 로컬호스트로 들어오는 요청에 대해서만 처리할 수 있기 때문에, 실 환경에서는 의미가 없다. 실 환경에서는 "0.0.0.0"이나 "192.168.0.1"과 같이 전체 허용 또는 특정 호스트를 지정해 주는 것이 좋다.

단계2, 리모트 서버에 액터 생성하기

다음의 두 가지 방법을 이용해서 리모트 서버에서 액터를 실행할 수 있다.3
  • 클라이언트에서 생성/관리: 리모트 노드에 있는 액터를 클라이언트에서 관리해야 할 때 사용 (액터 모니터링, 액터 수퍼바이징 등)
  • 서버에서 생성/관리: 클라이언트에 액터 서비스만 제공하고 서버에서 액터에 대한 관리를 할 때 주로 사용한다.

클라이언트에서 원격지 서버에 액터 생성하기

다음의 코드를 사용하면 클라이언트에서 리모트 서버에 액터를 생성하고 관리할 수 있다.

ActorRef actor1 = Actors.remote().actorOf(MyActor.class, "172.20.1.11", 2552);
actor1.start();
actor1.sendOneWay("hello");
actor1.stop();

Actors.remote().actorOf() 메서드는 리모트 서버에 MyActor 타입의 액터를 생성한다. 클라이언트에서 액터를 생성한 경우 로컬 액터를 사용하듯이 start() 메서드를 이용해서 액터를 시작하고 stop() 메서드를 이용해서 액터를 종료할 수 있다.

클라이언트에서 리모트 서버에 액터를 생성할 때 주의할 점은 호스트와 포트가 "localhost"와 2552 이면, 리모트 액터가 아닌 로컬 액터로 생성해서 실행된다는 점이다.

서버에서 액터 생성해서 등록하기

서버에서 액터를 생성해서 클라이언트에 제공할 수도 있다. 서버에서 액터를 등록할 때에는 다음과 같이 register() 메서드를 사용하면 된다.

Actors.remote().start("0.0.0.0", 2552);
// MyActor를 리모트 액터로 등록, 식별값은 "hello-service"
Actors.remote().register("hello-service", Actors.actorOf(MyActor.class));

register() 메서드를 사용하면 액터는 자동으로 시작된다.

클라이언트는 액터의 식별값을 이용해서 리모트 액터에 대한 레퍼런스를 구할 수 있으며, 이 레퍼런스를 이용해서 리모트 액터에 메시지를 전달할 수 있다. actorFor() 메서드를 사용하면 리모트 노드에서 생성한 액터에 접근할 수 있다.

// 192.168.1.11:2553 포트로 실행중인 리모트 서버에 등록된 "hello-service" 액터 접근
ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.1.11", 2553);
actor.sendOneWay("테스트!!!"); // 리모트 액터에 메시지 전달


리모트 액터에서 클라이언트에 응답하기

리모트 액터에서 클라이언트에 응답하는 방법은 앞서 'Akka 첫 번째, Akka를 이용한 Concurrent 프로그래밍 시작하기' 에서 살펴봤던 것과 동일하다.

로컬 액터에서 리모트 액터로 메시지를 전송하면, 리모트 액터는 getContext().getSender()를 이용해서 로컬 액터에 메시지를 전달할 수 있다. (즉, 리모트 액터 입장에서는 로컬 액터가 리모트 액터가 되는 것이다.) 리모트 액터에서 로컬 액터에 메시지를 전송할 때에도 결국 네트워크를 통해서 보내기 때문에, 클라이언트도 리모트 서버를 실행해야 리모트 액터에서 로컬 액터에 메시지를 전송할 수 있게 된다.

---- 클라이언트 코드

// 리모트 액터에서 로컬 액터에 메시지 보낼 때 사용할 서버 실행
Actors.remote().start("192.168.4.4", 2552);

ActorRef localActor = Actors.actorOf(LocalActor.class);
localActor.start();

ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.4.3", 2552);
actor.sendOneWay("테스트!!!", localActor); // 로컬 액터를 sender로 지정

---- 리모트 액터 코드
public class MyActor extends UntypedActor {
   
    @Override
    public void onReceive(Object msg) throws Exception {
        if (getContext().getSender().isDefined()) {
            ActorRef sender = getContext().getSender().get(); // 클라이언트의 LocalActor가 sender
            sender.sendOneWay(msg); // 192.168.4.4:2552 로 메시지 전송
        }
    }
}

클라이언트 코드에서 리모트 서버를 실행하지 않으면, 리모트 액터가 로컬 액터에 메시지를 전달할 수 없게 된다. 즉, 서로 다른 노드에 있는 액터들 간에 메시지를 주고 받기 위해서는 각 노드마다 리모트 서버를 실행시켜 주어야 한다.

비신뢰 모드(UntrustedMode)로 리모트 서버 실행하기

리모트 서버를 비신뢰 모드로 실행하게 되면, 클라이언트에서 액터를 생성할 수 없게 된다. 리모트 서버를 비신뢰 모드로 실행하려면 설정 파일에 다음과 같이 untrusted-mode 값을 on으로 설정해 주면 된다.

akka {
    remote {
        server {
            untrusted-mode = on  # 기본 값은 off
        }
    }
}

비신뢰 모드로 실행하면 클라이언트에서 리모트 액터에 대해 다음의 메서드에 대한 호출이 제한된다.
  • start(), stop(), link(), unlink(), spawnLink() 등

리모트 서버와 클라이언트 종료 처리

아래의 클라이언트 코드를 실행하면 JVM이 종료되지 않고 실행된 채로 남아 있는다. 이유는 Akka가 내부적으로 리모트 서버와의 연결 처리를 위해 사용하는 쓰레드가 죽지 않기 때문이다.

public class Client {

    public static void main(String[] args) {
        ActorRef actor = Actors.remote().actorFor("hello-service", "172.20.4.64", 2553);
        actor.sendOneWay("테스트!!!");
        // JVM 종료되지 않음
    }
}

shutdown() 메서드를 이용해서 리모트 서버와의 연결을 종료시키고 관련된 모든 쓰레드를 함께 종료시켜주므로, 클라이언트 코드에서 리모트 액터에 대한 사용이 끝나면 shutdown() 메서드를 호출해서 JVM을 종료처리할 수 있다.

ActorRef actor = Actors.remote().actorFor("hello-service", "172.20.4.64", 2553);
// actor 사용
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함

리모트 서버에서도 마찬가지로, 어플리케이션을 종료처리할 때 shutdown() 메서드를 호출해 주어야 관련된 쓰레드가 모두 정리되어 JVM이 종료하게 된다.

Actors.remote().start("0.0.0.0", 2553);
...
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함



 

이벤트 처리

클라이언트와 서버는 액터를 이용해서 클라이언트의 연결/해제 등의 이벤트를 수시할 수 있다. 리모트 기능과 관련된 이벤트를 처리하고 싶다면 다음과 같이 이벤트를 수신할 액터를 Actors.remote().addListener() 메서드를 이용해서 이벤트 리스너로 등록해주면 된다.

ActoreRef listener = Actors.actorOf(ListenerActor.class);
listener.start();

Actors.remote().addListener(listener);

리스너로 사용되는 액터에는 리모트 기능과 관련된 이벤트 객체가 메시지로 전달되며, 액터는 이벤트 타입에 따라서 알맞은 작업을 수행하면 된다.

public class ListenerActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RemoteServerStarted) {
            ...
        }
    }

}

주요 이벤트 클래스는 다음과 같다.
  • 서버측 이벤트
    • RemoteServerStarted
    • RemoteServerShutdown
    • RemoteServerClientConnected
    • RemoteServerClientDisconnected
    • RemoteServerClientClosed
    • RemoteServerWriteFailed
  • 클라이언트측 이벤트
    • RemoteClientConnected
    • RemoteClientDisconnected
    • RemoteClientStarted
    • RemoteClientShutdown
    • RemoteClientError
    • RemoteClientWriteFailed

참고자료

 

 


 

 

Akka 세 번째, Supervisor를 이용한 Fault Tolerance

특정 노드에 갑자기 장애가 발생했다. 이럴 때 가장 먼저 개발자들이 선택하는 장애 대처 방법은? 아마도 관련 프로세스를 재시작하는 방법이 가장 많이 사용될 것이다. 웹서버를 재시작하거나 로그 수신기를 재시작하는 등 뭔가 문제가 발생한 영역의 프로세스나 쓰레드를 재시작함으로써 서비스 다운타임을 줄이는 것이 장애 발생 시 첫 번째로 수행하는 작업이다. 일단, 재시작해서 서비스가 살아나도록 만든 뒤, 그 다음에 원인 분석을 하게 된다.

Akka도 이와 비슷한 방법으로 액터의 장애에 대응할 수 있는 방법을 제공하고 있다. 액터는 자신을 관리하는 Supervisor를 가질 수 있으며, Supervisor는 액터가 다운될 경우 재시작함으로써 다운타임을 최소화하도록 해 준다. 이 방식은 얼랭(erlang)으로부터 빌려온 방식으로서 Akka는 Supervisor를 통해 무정지 서비스를 구현할 수 있도록 하고 있다.

액터의 두 가지 라이프 사이클: permanent, temporary

액터는 다음의 두 가지 라이프 사이클을 가진다.
  • permanent: 메시지 처리 과정에서 예외가 발생해도 액터가 살아 있음.
  • temporary: 메시지 처리 과정에서 예외가 발생하면 액터가 죽음.
액터를 permanent로 설정할 지 temporary로 설정할 지의 여부는 다음과 같이 설정할 수 있다.

import akka.config.Supervision;

public class WorkerActor extends UntypedActor {
    public WorkerActor() {
        getContext().setLifeCycle(Supervision.temporary());
        ...
    }


 

Akka의 액터 예외 대응 방식: Let it Crash

다중 쓰레드를 이용해서 병행 처리 코드를 작성할 경우, 병행 처리 코드에서 예외가 발생했을 때 이를 알 수 있는 방법은 예외 추적 메시지를 확인하는 방법 뿐이다. (또는 try-catch로 모든 예외를 잡아서 알림해 주는 기능을 넣는 방법 뿐이다.) 예외가 발생해서 병행 처리 쓰레드가 종료된 경우 이를 복구하는 방법은 재시작해주는 것 외에 특별한 방법이 없다.

Akka는 액터가 더 이상 정상적으로 동작할 수 없는 상태가 되어 메시지 처리 과정 중 예외를 발생시키면, 해당 액터를 재시작하는 방법으로 장애에 대응한다. 복구할 수 없는 예외 상황 발생시 액터가 뭔가 하지 않고 그냥 죽도록 놔두고 안정된 상태로 초기화하고 재시작하기 때문에, 이 방식을 "Let it Crash"라고 부른다.

수버바이저를 이용한 액터 관리

Akka는 수퍼바이저를 이용해서 액터를 관리한다. 수퍼바이저는 다른 액터를 모니터링하는 액터로서, 수퍼바이저 액터에서 다른 액터를 연결(link)함으로써 수퍼바이저가 다른 액터를 관리하게 된다.

수퍼바이저는 연결된 액터가 죽었을 때 다음의 두 가지 방식을 이용해서 연결된 액터를 재시작한다. 참고로 permanent 모드의 액터만 재시작되며, temporary 액터는 재시작되지 않는다.
  • One-For-One
  • All-For-One
One-For-One은 수퍼바이저와 연결된 액터가 죽으면, 죽은 액터만 재시작하고 나머지 연결된 액터는 그대로 유지한다. (아래 그림 참고)

 
[발췌: http://doc.akka.io/fault-tolerance-java]

반면 All-For-One은 수퍼바이저와 연결된 액터 중 하나가 죽으면, 연결된 모든 액터를 재시작한다. (아래 그림 참고) 이는 수퍼바이저에 의해 관리되는 액터 중 하나로도 비정상적으로 동작하면 나머지 액터들도 영향을 받아서 비정상적으로 동작하게 될 때에 사용된다.


[발췌: http://doc.akka.io/fault-tolerance-java]

 


수퍼바이저(Supervisor) 액터 만들기

수퍼바이저 액터는 일반 액터와 동일한 액터로서, 다음의 두 가지 방법을 이용해서 만들 수 있다.

  • link() 메서드를 이용
  • Supervisor 클래스를 이용해서 생성

link()를 이용한 액터 연결 및 관리

액터는 다른 액터를 연결함으로써 수퍼바이저 액터가 될 수 있으며, 연결할 때에는 link() 메서드를 사용한다. link()를 이용해서 액터를 관리할 경우 다음과 같은 방법으로 개발을 진행하면 된다.
  1. 수퍼바이저 액터로 동작할 클래스의 생성자에 FaultHandler를 지정한다. FaultHandler는 관리되는 액터가 죽었을 때, 그 액터만 재시작할 지 아니면 관리되는 모든 액터를 재시작할 지의 여부를 지정한다.
  2. 수퍼바이저 액터를 생성한 뒤, 관리할 액터를 link()로 연결한다.

1번, 수퍼바이저 액터를 직접 구현할 경우 다음과 같이 수퍼바이저 액터 생성자에서 재시작 전략을 지정해 주어야 한다.

import akka.actor.UntypedActor;
import akka.config.Supervision.OneForOneStrategy;

public class MasterActor extends UntypedActor {

    public MasterActor() {
        getContext().setFaultHandler(
                new oneForOneStrategy(
                        new Class[] { RuntimeException.class }, 3, 1000));
    }

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println("Master가 받은 메시지: " + message);
    }

}

위 코드에서 MasterActor는 관리하는 액터가 죽으면 해당 액터만 재시작하도록 설정하였다. oneForOneStrategy 객체를 생성할 때 첫 번째 파라미터는 액터를 재시작할 예외 타입을 지정한다. 위 코드는 모니터링 대상 액터의 onReceive() 메서드에서 RuntimeException이 발생하면 액터를 재시작한다는 것을 의미한다. 뒤의 두 숫자에 대해서는 뒤에서 다시 설명하겠다.

관리되는 액터가 죽을 때 관리되는 다른 액터들도 함께 재시작하고 싶은 경우에는 AllForOneStrategy 클래스를 사용하면 된다. 생성자에 전달되는 파라미터 목록은 oneForOneStrategy 클래스와 동일하다.

2번, 수퍼바이저 액터를 알맞게 구현했다면 그 다음으로 할 작업은 link() 메서드를 이용해서 수퍼바이저에 관리할 액터를 연결해 주는 것이다. 아래 코드는 예를 보여주고 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();

master.link(worker1);

테스트를 위해 WorkerActor가 "die"라는 메시지를 받으면 RuntimeException을 발생시키도록 구현해 보았다.

@SuppressWarnings("unchecked")
public class WorkerActor extends UntypedActor {
    private static int num = 1;
   
    private int id;
    public WorkerActor() {
        id = num++;
        System.out.println("액터 생성됨: " + id);
    }
   
    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("die")) {
            throw new RuntimeException("고의적 DIE");
        }
        System.out.println("Worker " + id + ": " + message);
    }
   
    @Override
    public void preRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 전처리");
    }
   
    @Override
    public void postRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 후처리");
    }
   
}

WorkerActor는 preRestart() 메서드와 postRestart() 메서드를 구현하고 있는데, 이 두 메서드는 각각 액터가 재시작하기 전/후에 호출된다. WorkerActor가 생성될 때 마다 1씩 증가된 id 값을 할당하는데 id 값을 새로 부여한 이유는 액터가 재시작할 때 액터 객체를 새로 생성하는 지의 여부를 확인하기 위해서다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
worker2.start();

master.link(worker1); // master에 worker1 액터 연결
master.link(worker2); // master에 worker2 액터 연결

worker1.sendOneWay("메시지1-1");
worker2.sendOneWay("메시지2-1");
worker1.sendOneWay("메시지1-2");
worker2.sendOneWay("메시지2-2");

worker1.sendOneWay("die"); // worker1 액터 죽임!
worker1.sendOneWay("메시지1-3"); // worker1 액터에 메시지 전달
worker2.sendOneWay("메시지2-3");

위 코드는 중간에 worker1에 "die" 메시지를 보냄으로써 worker1을 죽인다. worker1 액터는 "die" 메시지를 받으면 RuntimeException을 발생시키는데, MasterWorker는 RuntimeException이 발생할 경우 해당 액터를 재시작하라고 설정하고 있다. 따라서, worker1 액터는 "die" 메시지를 받는 순간 RuntimeException을 발생시키며 죽지만 곧이어 재시작하게 되고, 따라서 죽은 이후에 받은 "메시지1-3" 메시지를 재시작한 액터가 처리하게 된다.

실제 위 코드의 실행 결과는 다음과 같다. (Akka가 출력하는 로그 메시지 중 중요한 것만 남기고 나머지는 생략하였다.)

액터 생성됨: 1
액터 생성됨: 2
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb1-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
Worker 1: 메시지1-1
Worker 1: 메시지1-2
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Exception when invoking
    actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]]
    with message [die]
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Problem
java.lang.RuntimeException: 고의적 DIE
    at tuto3.WorkerActor.onReceive(WorkerActor.java:18) ~[classes/:na]
    ...
Worker 2: 메시지2-1
Worker 2: 메시지2-2
Worker 2: 메시지2-3
16:43:40.968 [akka:event-driven:dispatcher:global-3] INFO  akka.actor.Actor$ - Restarting actor [tuto3.WorkerActor] configured as PERMANENT.
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'preRestart' for failed actor instance [tuto3.WorkerActor].
Worker 1: 재시작 전처리
액터 생성됨: 3
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'postRestart' for new actor instance [tuto3.WorkerActor].
Worker 3: 재시작 후처리
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Restart: true for [tuto3.WorkerActor].
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG a.d.Dispatchers$globalExecutorBasedEventDrivenDispatcher$ - Resuming 46f67fb0-506a-11e0-a0e5-001d92ad4c1a
16:43:40.984 [akka:event-driven:dispatcher:global-4] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-4
Worker 3: 메시지1-3

위 실행 결과를 보면 다음의 사실을 확인할 수 있다.
  • 재시작 전처리는 1번 Worker가 수행한다.
  • 전처리 후, worker1에 해당하는 새로운 액터 객체를 생성한다. (액터 생성된: 3)
  • 재시작 후처리는 3번 Worker가 수행한다.
  • 이후 worker1은 3번 Worker와 연결되며, "메시지1-3" 메시지는 3번 Worker가 수행하게 된다.
즉, 액터가 죽으면 그 액터 객체를 재사용하는 것이 아니라 새로운 액터 객체를 생성하는 방법으로 재시작하는 것을 알 수 있다.

Supervisor 클래스를 이용한 수퍼바이저 액터 생성

수퍼바이저 액터에서 직접 관리할 액터를 생성하는 경우가 아니면 수퍼바이저 액터를 별도로 구현하기 보다는 Akka가 제공하는 Supervisor 클래스를 이용하는 것이 편리하다.


import akka.actor.Supervisor;
import akka.actor.SupervisorFactory;
import akka.config.Supervision;
import akka.config.Supervision.OneForOneStrategy;
import akka.config.Supervision.Supervise;
import akka.config.Supervision.SupervisorConfig;

...

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
ActorRef worker2 = Actors.actorOf(WorkerActor.class);

Supervise[] supervises = new Supervise[2];
supervises[0] = new Supervise(worker1, Supervision.permanent());
supervises[1] = new Supervise(worker2, Supervision.permanent());

OneForOneStrategy strategy = new oneForOneStrategy(
        new Class[] {RuntimeException.class}, 3, 3000);
SupervisorConfig config = new SupervisorConfig(strategy, supervises);

Supervisor supervisor = new SupervisorFactory(config).newInstance();
// supervisor 생성 시점에서 내부적으로 생성한 SupervisorActor와 worker1가 worker2가 시작됨

worker1.sendOneWay("메시지");

SupervisorFactory를 통해서 Supervisor를 생성하면, 내부적으로 SupervisorActor 타입의 액터를 생성하고, 그 액터에 SupervisorConfig에 지정된 모든 액터를 연결(link)하고, 각 액터를 시작(start) 한다.

내부적으로 생성한 SupervisorActor에 접근하고 싶다면, 다음과 같이 supervisor() 메서드를 사용하면 된다.

Supervisor supervisor = ...;
ActorRef supervisorActor = supervisor.supervisor();


재시작 횟수 제한

OneForOneStrategy나 AllForOneStrategy를 생성할 때 두 번째/세 번째 파라미터는 각각 제한된 시간 내의 최대 재시작 시도 회수와 제한을 시간 의미한다.  예를 들어, 아래 코드는 1초 이내에 최대 3번의 재시작 시도를 시도한다는 것을 의미한다. (1초 안에 재시작을 3번까지 허용한다는 의미가 아니다.)

new oneForOneStrategy(new Class[] { RuntimeException.class }, 3, 1000);

1초 안에 액터 재시작을 3번 실패하면 (예를 들어, postRestart() 메서드에서 런타임 예외가 발생해서 실패), 해당 액터에 대해 재시작 시도를 하지 않으며 더 이상 액터를 사용할 수 없게 된다.

액터 생성/연결 한번에 하기

ActorRef는 액터를 생성하고 관리하기 위한 메서드를 제공하고 있으며, 이들 메서드는 다음과 같다.
  • ActorRef spawn(Class clazz): 액터를 생성하고 시작한다.
  • ActorRef spawnLink(Class clazz): 액터를 생성하고 시작하고, 연결한다.
  • ActorRef spawnRemote(Class clazz, String host, int port, long timeout): 리모트 액터를 생성하고 시작한다.
  • void startLink(ActorRef actor): 액터를 시작하고 연결한다.
위 메서드를 이용하면 다음과 같이 코드를 조금 더 간결하게 작성할 수 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();
ActorRef worker1 = master.spawnLink(WorkerActor.class); // worker1 액터 시작/연결 됨

ActorRef worker2 = Actors.actorOf(WorkerActor.class);
master.startLink(worker2); // worker2 시작/연결 됨


참고자료


 


 

 

Akka 네 번째, TypedActor를 이용한 인터페이스 사용

 

Akka는 액터에 메시지를 전달하고 응답을 받을 때 자바 인터페이스를 사용할 수 있는 기능을 제공하고 있다. 즉, ActorRef의 sendOneWay()나 sendRequestReply()와 같은 메서드가 아닌 자바 인터페이스에 정의된 메서드를 이용해서 액터에 메시지를 전달하고 응답을 받을 수 있도록 하고 있다. 본 글에서는 Akka가 제공하는 TypedActor를 이용해서 자바 인터페이스를 액터와의 통신 인터페이스로 사용하는 방법을 설명한다.

TypedActor를 이용한 자바 인터페이스 기반 액터 생성

TypedActor 클래스를 사용하면 인터페이스를 구현한 자바 클래스를 액터로 사용할 수 있다. TypedActor를 사용하려면 다음과 같이 인터페이스와 그 인터페이스를 구현한 클래스를 필요로 한다. 이때 인터페이스를 구현한 클래스는 TypedActor 클래스를 상속 받아야 한다.

public interface DataMigrator {
    public void run();
    public int restCount();
}

public class DataMigratorImpl extends TypedActor implements DataMigrator {

    private int count = 0;

    @Override
    public void run() {
        System.out.println("DataMigratorImpl: 작업 시작");
        // 뭔가 작업을 비동기로 처리
    }

    @Override
    public int restCount() {
        return 100 - count;
    }
}


액터와 통신할 때 사용할 인터페이스를 구현하고 TypedActor를 상속받은 클래스를 구현했다면, 다음의 코드를 이용해서 액터를 생성하고 사용할 수 있다.

DataMigrator migrator =
            TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);

// migrator는 액터와 통신을 위한 프록시
migrator.run();
int rest = migrator.restCount();
do {
    Thread.sleep(10);
    rest = migrator.restCount();
} while(rest > 0);

TypedActor.stop(migrator); // TypedActor 종료
// Actors.registry().shutdownAll(); 코드도 TypedActor 종료


TypedActor.newInstance()의 첫 번째 파라미터는 액터와 통신할 때 사용할 인터페이스 타입을 지정하며, 두 번째 파라미터는 실제 TypedActor로 사용될 클래스를 지정한다. TypedActor.newInstance() 메서드가 생성한 객체는 액터와 통신을 수행해주는 프록시 객체가 된다. 위 코드에서는 migrator가 프록시 객체가 되는데, 이 프록시 객체의 메서드를 호출하면, 내부적으로 TypedActor 객체에 메시지를 전송하게 되고 TypedActor 객체는 일치하는 메서드를 호출하게 된다.

[참고]

Akka는 TypedActor에 대한 프록시를 객체를 생성하기 위해 AspectWerkz(http://aspectwerkz.codehaus.org/ 참고)를 사용한다.



Fire-And-Forget

메서드의 리턴 타입이 void 이면, 해당 메서드에 대한 메시지는 sendOneWay()와 동일하게 Fire-And-Forget 방식으로 전송된다. 따라서, 메서드를 호출하면 액터가 메시지를 처리 여부에 상관없이 즉시 리턴한다. DataMigrator 인터페이스의 run() 메서드가 이에 해당한다.

DataMigrator migrator = TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);
migrator.run(); // 리턴 타입이 void 이므로 Fire-And-Forget 방식



Send-And-Receive-Eventually

메서드가 리턴 타입을 가지면, sendRequestReply()와 동일하게 Send-And-Receive-Eventually 방식으로 메시지가 전송된다. 따라서, 액터로부터 응답이 도착할 때 까지 블럭킹 된다.

DataMigrator migrator = TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);
int rest = migrator.restCount(); // Send-and-receive-Eventually 방식


Send-And-Receive-Future

메서드의 리턴 타입이 akka.dispatch.Future이면, Send-And-Receive-Future 방식으로 메서드를 호출한다.

리모트 TypedActor 생성하기

UntypedActor와 마찬가지로 TypedActor도 간단하게 리모트 액터로 제공할 수 있다.

리모트 서버에서 TypedActor 생성하기

리모트 서버에서, 액터를 리모트 액터로 등록하려면 registerTypedActor()를 사용하면 된다.

Actors.remote().start("0.0.0.0", 2553);
DataMigrator migrator = TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);
Actors.remote().registerTypedActor("data-migrator", migrator);


클라이언트 코드에서는 Actors.remote().typedActorFor() 메서드를 이용해서 리모트 액터에 대한 프록시 객체를 구한 뒤 알맞은 메서드를 호출하면 된다.

DataMigrator migrator = Actors.remote()
        .typedActorFor(DataMigrator.class, "data-migrator", "172.20.1.2", 2553);

migrator.run();
int rest = migrator.restCount();
do {
    Thread.sleep(1000);
    rest = migrator.restCount();
} while(rest > 0);

Actors.remote().shutdown();


[주의]

클라이언트에서 리모트 액터를 생성할 수도 있으나, 현재 버전에서는 기능이 예상하는 대로 동작하지 않아 본 글에서는 소개하지 않는다.


참고자료

 

  • http://doc.akka.io/typed-actors-java