-----------------------------------------------------------------------------------------------
출처: http://javacan.tistory.com/entry/akka-1-start
-----------------------------------------------------------------------------------------------
Akka 첫 번째, Akka를 이용한 Concurrent 프로그래밍 시작하기
개인적으로 관심을 가지고 지켜보던 Akka 프로젝트가 1.0 버전이 되었다. 평소에 병행 처리와 분산 처리에 관심이 많았는데, Akka는 이를 보다 쉽게 구현할 수 있도록 도와주는 프로젝트이다. 본 글에서는 Akka가 무엇인지 간단하게 설명하고 실제 Akka를 이용해서 액터를 생성하고 실행하는 방법을 살펴볼 것이다.
- 액터들은 상태를 공유하지 않는다.
- 액터들 간의 통신은 메시지 전달을 통해서 이루어진다. (이벤트 기반 모델)
- 액터간의 통신은 비동기로 이루어진다.
- 각 액터는 전달받은 메시지를 큐에 보관하며, 메시지를 순차적으로 처리한다.
- 액터는 일종의 경량 프로세서다.
- Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.
- Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.
- Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.
- sendOneWay(Object message)
- sendOneWay(Object message, ActorRef sender) : 메시지를 전송하면서 메시지를 보낸 액터로 sender를 지정한다.
- sendRequestReply(Object message)
- sendRequestReply(Object message, ActorRef sender): 메시지를 보낸 액터로 sender를 지정한다.
- sendRequestReply(Object message, long timeout, ActorRef sender)
- sendRequestReplyFuture(Object message)
- sendRequestReplyFuture(Object message, ActorRef sender) : 메시지를 보낸 액터로 sender를 지정한다.
- sendRequestReplyFuture(Object message, long timeout, ActorRef sender)
- NEW: 액터가 만들어졌을 때. 메시지를 수신하지 못한다.
- STARTED: start()가 호출되었을 때. 메시지를 수신할 수 있다.
- SHUTDOWN: exit()나 stop()이 호출되었을 때. 어떤 것도 하지 못한다.
- preStart(): 액터 시작 전에 호출된다.
- postStop(): 액터 종료 후에 호출된다.
- preRestart(Throwable reason): 액터 재시작 전에 호출된다. (무정지 액터 기능과 관련됨)
- postRestart(Throwable reason): 액터 재시작 후에 호출된다. (무정지 액터 기능과 관련됨)
- akka.config 시스템 프로퍼티로 지정한 파일 (java -Dakka.config=... )
- 클래스패스에 위치한 akka.config 파일
- AKKA_HOME 환경변수 존재 시, '$AKKA_HOME/config 디렉터리의 설정 파일 사용. (또는 akka.home 시스템 프로퍼티를 AKKA_HOME 환경 변수 대신 사용)
- Akka 홈페이지: http://akka.io/
- Akka 문서: http://doc.akka.io/
- 액터 모델: http://en.wikipedia.org/wiki/Actor_model
Akka 두 번째, 리모트 액터 사용하기
Akka가 관심을 끄는 이유는 사실 액터 모델 자체보다는 리모트 노드에 위치한 액터를 마치 로컬에 위치한 액터처럼 사용할 수 있다는 것이었다. Scala 언어가 자체적으로 액터를 제공하고 있지만, Akka의 액터는 이 액터 모델을 리모트까지 확장했기 때문에, Akka를 사용하면 한 노드에서의 병행 처리 뿐만 아니라 다수 노드에서의 병행 처리까지 쉽게 구현할 수 있다.
- 리모트 서버를 만든다. 리모트 서버는 리모트로 제공될 액터를 관리하며, 클라이언트는 리모트 서버에 연결해서 리모트로 제공되는 액터를 사용하게 된다.
- 리모트 서버에 액터 등록하기 (클라이언트에서 액터 등록하기, 서버에서 액터 등록하기)
Actors.remote().start("0.0.0.0", 2552); // 모든 호스트에 대해 2552 포트로 들어오는 요청 처리
Actors.remote().start("localhost", 2553); // 로컬 호스트의 2553 포트로 들어오는 요청 처리
Actors.remote().start(); // 설정 파일에 있는 기본 값 사용 (설정 파일 없을 시 기본값은 "localhost", 2552)
start() 메서드에서 호스트 값으로 "localhost"를 지정하면, 로컬호스트로 들어오는 요청에 대해서만 처리할 수 있기 때문에, 실 환경에서는 의미가 없다. 실 환경에서는 "0.0.0.0"이나 "192.168.0.1"과 같이 전체 허용 또는 특정 호스트를 지정해 주는 것이 좋다.
다음의 두 가지 방법을 이용해서 리모트 서버에서 액터를 실행할 수 있다.3
- 클라이언트에서 생성/관리: 리모트 노드에 있는 액터를 클라이언트에서 관리해야 할 때 사용 (액터 모니터링, 액터 수퍼바이징 등)
- 서버에서 생성/관리: 클라이언트에 액터 서비스만 제공하고 서버에서 액터에 대한 관리를 할 때 주로 사용한다.
클라이언트에서 원격지 서버에 액터 생성하기
다음의 코드를 사용하면 클라이언트에서 리모트 서버에 액터를 생성하고 관리할 수 있다.
actor1.start();
actor1.sendOneWay("hello");
actor1.stop();
Actors.remote().actorOf() 메서드는 리모트 서버에 MyActor 타입의 액터를 생성한다. 클라이언트에서 액터를 생성한 경우 로컬 액터를 사용하듯이 start() 메서드를 이용해서 액터를 시작하고 stop() 메서드를 이용해서 액터를 종료할 수 있다.
클라이언트에서 리모트 서버에 액터를 생성할 때 주의할 점은 호스트와 포트가 "localhost"와 2552 이면, 리모트 액터가 아닌 로컬 액터로 생성해서 실행된다는 점이다.
서버에서 액터를 생성해서 클라이언트에 제공할 수도 있다. 서버에서 액터를 등록할 때에는 다음과 같이 register() 메서드를 사용하면 된다.
// MyActor를 리모트 액터로 등록, 식별값은 "hello-service"
Actors.remote().register("hello-service", Actors.actorOf(MyActor.class));
register() 메서드를 사용하면 액터는 자동으로 시작된다.
클라이언트는 액터의 식별값을 이용해서 리모트 액터에 대한 레퍼런스를 구할 수 있으며, 이 레퍼런스를 이용해서 리모트 액터에 메시지를 전달할 수 있다. actorFor() 메서드를 사용하면 리모트 노드에서 생성한 액터에 접근할 수 있다.
ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.1.11", 2553);
actor.sendOneWay("테스트!!!"); // 리모트 액터에 메시지 전달
리모트 액터에서 클라이언트에 응답하기
리모트 액터에서 클라이언트에 응답하는 방법은 앞서 'Akka 첫 번째, Akka를 이용한 Concurrent 프로그래밍 시작하기' 에서 살펴봤던 것과 동일하다.
로컬 액터에서 리모트 액터로 메시지를 전송하면, 리모트 액터는 getContext().getSender()를 이용해서 로컬 액터에 메시지를 전달할 수 있다. (즉, 리모트 액터 입장에서는 로컬 액터가 리모트 액터가 되는 것이다.) 리모트 액터에서 로컬 액터에 메시지를 전송할 때에도 결국 네트워크를 통해서 보내기 때문에, 클라이언트도 리모트 서버를 실행해야 리모트 액터에서 로컬 액터에 메시지를 전송할 수 있게 된다.
// 리모트 액터에서 로컬 액터에 메시지 보낼 때 사용할 서버 실행
Actors.remote().start("192.168.4.4", 2552);
ActorRef localActor = Actors.actorOf(LocalActor.class);
localActor.start();
ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.4.3", 2552);
actor.sendOneWay("테스트!!!", localActor); // 로컬 액터를 sender로 지정
---- 리모트 액터 코드
public class MyActor extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
if (getContext().getSender().isDefined()) {
ActorRef sender = getContext().getSender().get(); // 클라이언트의 LocalActor가 sender
sender.sendOneWay(msg); // 192.168.4.4:2552 로 메시지 전송
}
}
}
클라이언트 코드에서 리모트 서버를 실행하지 않으면, 리모트 액터가 로컬 액터에 메시지를 전달할 수 없게 된다. 즉, 서로 다른 노드에 있는 액터들 간에 메시지를 주고 받기 위해서는 각 노드마다 리모트 서버를 실행시켜 주어야 한다.
비신뢰 모드(UntrustedMode)로 리모트 서버 실행하기
리모트 서버를 비신뢰 모드로 실행하게 되면, 클라이언트에서 액터를 생성할 수 없게 된다. 리모트 서버를 비신뢰 모드로 실행하려면 설정 파일에 다음과 같이 untrusted-mode 값을 on으로 설정해 주면 된다.
remote {
server {
untrusted-mode = on # 기본 값은 off
}
}
}
비신뢰 모드로 실행하면 클라이언트에서 리모트 액터에 대해 다음의 메서드에 대한 호출이 제한된다.
- start(), stop(), link(), unlink(), spawnLink() 등
리모트 서버와 클라이언트 종료 처리
아래의 클라이언트 코드를 실행하면 JVM이 종료되지 않고 실행된 채로 남아 있는다. 이유는 Akka가 내부적으로 리모트 서버와의 연결 처리를 위해 사용하는 쓰레드가 죽지 않기 때문이다.
public static void main(String[] args) {
ActorRef actor = Actors.remote().actorFor("hello-service", "172.20.4.64", 2553);
actor.sendOneWay("테스트!!!");
// JVM 종료되지 않음
}
}
shutdown() 메서드를 이용해서 리모트 서버와의 연결을 종료시키고 관련된 모든 쓰레드를 함께 종료시켜주므로, 클라이언트 코드에서 리모트 액터에 대한 사용이 끝나면 shutdown() 메서드를 호출해서 JVM을 종료처리할 수 있다.
// actor 사용
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함
리모트 서버에서도 마찬가지로, 어플리케이션을 종료처리할 때 shutdown() 메서드를 호출해 주어야 관련된 쓰레드가 모두 정리되어 JVM이 종료하게 된다.
...
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함
클라이언트와 서버는 액터를 이용해서 클라이언트의 연결/해제 등의 이벤트를 수시할 수 있다. 리모트 기능과 관련된 이벤트를 처리하고 싶다면 다음과 같이 이벤트를 수신할 액터를 Actors.remote().addListener() 메서드를 이용해서 이벤트 리스너로 등록해주면 된다.
listener.start();
Actors.remote().addListener(listener);
리스너로 사용되는 액터에는 리모트 기능과 관련된 이벤트 객체가 메시지로 전달되며, 액터는 이벤트 타입에 따라서 알맞은 작업을 수행하면 된다.
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RemoteServerStarted) {
...
}
}
}
주요 이벤트 클래스는 다음과 같다.
- 서버측 이벤트
- RemoteServerStarted
- RemoteServerShutdown
- RemoteServerClientConnected
- RemoteServerClientDisconnected
- RemoteServerClientClosed
- RemoteServerWriteFailed
- 클라이언트측 이벤트
- RemoteClientConnected
- RemoteClientDisconnected
- RemoteClientStarted
- RemoteClientShutdown
- RemoteClientError
- RemoteClientWriteFailed
참고자료
- Akka Remote Actor(Java): http://doc.akka.io/remote-actors-java
Akka 세 번째, Supervisor를 이용한 Fault Tolerance
액터의 두 가지 라이프 사이클: permanent, temporary
액터는 다음의 두 가지 라이프 사이클을 가진다.
- permanent: 메시지 처리 과정에서 예외가 발생해도 액터가 살아 있음.
- temporary: 메시지 처리 과정에서 예외가 발생하면 액터가 죽음.
public class WorkerActor extends UntypedActor {
public WorkerActor() {
getContext().setLifeCycle(Supervision.temporary());
...
}
다중 쓰레드를 이용해서 병행 처리 코드를 작성할 경우, 병행 처리 코드에서 예외가 발생했을 때 이를 알 수 있는 방법은 예외 추적 메시지를 확인하는 방법 뿐이다. (또는 try-catch로 모든 예외를 잡아서 알림해 주는 기능을 넣는 방법 뿐이다.) 예외가 발생해서 병행 처리 쓰레드가 종료된 경우 이를 복구하는 방법은 재시작해주는 것 외에 특별한 방법이 없다.
Akka는 액터가 더 이상 정상적으로 동작할 수 없는 상태가 되어 메시지 처리 과정 중 예외를 발생시키면, 해당 액터를 재시작하는 방법으로 장애에 대응한다. 복구할 수 없는 예외 상황 발생시 액터가 뭔가 하지 않고 그냥 죽도록 놔두고 안정된 상태로 초기화하고 재시작하기 때문에, 이 방식을 "Let it Crash"라고 부른다.
수버바이저를 이용한 액터 관리
Akka는 수퍼바이저를 이용해서 액터를 관리한다. 수퍼바이저는 다른 액터를 모니터링하는 액터로서, 수퍼바이저 액터에서 다른 액터를 연결(link)함으로써 수퍼바이저가 다른 액터를 관리하게 된다.
수퍼바이저는 연결된 액터가 죽었을 때 다음의 두 가지 방식을 이용해서 연결된 액터를 재시작한다. 참고로 permanent 모드의 액터만 재시작되며, temporary 액터는 재시작되지 않는다.
- One-For-One
- All-For-One
[발췌: http://doc.akka.io/fault-tolerance-java]
반면 All-For-One은 수퍼바이저와 연결된 액터 중 하나가 죽으면, 연결된 모든 액터를 재시작한다. (아래 그림 참고) 이는 수퍼바이저에 의해 관리되는 액터 중 하나로도 비정상적으로 동작하면 나머지 액터들도 영향을 받아서 비정상적으로 동작하게 될 때에 사용된다.
[발췌: http://doc.akka.io/fault-tolerance-java]
수퍼바이저(Supervisor) 액터 만들기
수퍼바이저 액터는 일반 액터와 동일한 액터로서, 다음의 두 가지 방법을 이용해서 만들 수 있다.
- link() 메서드를 이용
- Supervisor 클래스를 이용해서 생성
link()를 이용한 액터 연결 및 관리
액터는 다른 액터를 연결함으로써 수퍼바이저 액터가 될 수 있으며, 연결할 때에는 link() 메서드를 사용한다. link()를 이용해서 액터를 관리할 경우 다음과 같은 방법으로 개발을 진행하면 된다.
- 수퍼바이저 액터로 동작할 클래스의 생성자에 FaultHandler를 지정한다. FaultHandler는 관리되는 액터가 죽었을 때, 그 액터만 재시작할 지 아니면 관리되는 모든 액터를 재시작할 지의 여부를 지정한다.
- 수퍼바이저 액터를 생성한 뒤, 관리할 액터를 link()로 연결한다.
1번, 수퍼바이저 액터를 직접 구현할 경우 다음과 같이 수퍼바이저 액터 생성자에서 재시작 전략을 지정해 주어야 한다.
import akka.config.Supervision.OneForOneStrategy;
public class MasterActor extends UntypedActor {
public MasterActor() {
getContext().setFaultHandler(
new oneForOneStrategy(
new Class[] { RuntimeException.class }, 3, 1000));
}
@Override
public void onReceive(Object message) throws Exception {
System.out.println("Master가 받은 메시지: " + message);
}
}
위 코드에서 MasterActor는 관리하는 액터가 죽으면 해당 액터만 재시작하도록 설정하였다. oneForOneStrategy 객체를 생성할 때 첫 번째 파라미터는 액터를 재시작할 예외 타입을 지정한다. 위 코드는 모니터링 대상 액터의 onReceive() 메서드에서 RuntimeException이 발생하면 액터를 재시작한다는 것을 의미한다. 뒤의 두 숫자에 대해서는 뒤에서 다시 설명하겠다.
관리되는 액터가 죽을 때 관리되는 다른 액터들도 함께 재시작하고 싶은 경우에는 AllForOneStrategy 클래스를 사용하면 된다. 생성자에 전달되는 파라미터 목록은 oneForOneStrategy 클래스와 동일하다.
2번, 수퍼바이저 액터를 알맞게 구현했다면 그 다음으로 할 작업은 link() 메서드를 이용해서 수퍼바이저에 관리할 액터를 연결해 주는 것이다. 아래 코드는 예를 보여주고 있다.
master.start();
ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();
master.link(worker1);
테스트를 위해 WorkerActor가 "die"라는 메시지를 받으면 RuntimeException을 발생시키도록 구현해 보았다.
public class WorkerActor extends UntypedActor {
private static int num = 1;
private int id;
public WorkerActor() {
id = num++;
System.out.println("액터 생성됨: " + id);
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("die")) {
throw new RuntimeException("고의적 DIE");
}
System.out.println("Worker " + id + ": " + message);
}
@Override
public void preRestart(Throwable cause) {
System.out.println("Worker " + id + ": 재시작 전처리");
}
@Override
public void postRestart(Throwable cause) {
System.out.println("Worker " + id + ": 재시작 후처리");
}
}
WorkerActor는 preRestart() 메서드와 postRestart() 메서드를 구현하고 있는데, 이 두 메서드는 각각 액터가 재시작하기 전/후에 호출된다. WorkerActor가 생성될 때 마다 1씩 증가된 id 값을 할당하는데 id 값을 새로 부여한 이유는 액터가 재시작할 때 액터 객체를 새로 생성하는 지의 여부를 확인하기 위해서다.
master.start();
ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
worker2.start();
master.link(worker1); // master에 worker1 액터 연결
master.link(worker2); // master에 worker2 액터 연결
worker1.sendOneWay("메시지1-1");
worker2.sendOneWay("메시지2-1");
worker1.sendOneWay("메시지1-2");
worker2.sendOneWay("메시지2-2");
worker1.sendOneWay("die"); // worker1 액터 죽임!
worker1.sendOneWay("메시지1-3"); // worker1 액터에 메시지 전달
worker2.sendOneWay("메시지2-3");
위 코드는 중간에 worker1에 "die" 메시지를 보냄으로써 worker1을 죽인다. worker1 액터는 "die" 메시지를 받으면 RuntimeException을 발생시키는데, MasterWorker는 RuntimeException이 발생할 경우 해당 액터를 재시작하라고 설정하고 있다. 따라서, worker1 액터는 "die" 메시지를 받는 순간 RuntimeException을 발생시키며 죽지만 곧이어 재시작하게 되고, 따라서 죽은 이후에 받은 "메시지1-3" 메시지를 재시작한 액터가 처리하게 된다.
실제 위 코드의 실행 결과는 다음과 같다. (Akka가 출력하는 로그 메시지 중 중요한 것만 남기고 나머지는 생략하였다.)
액터 생성됨: 2
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb1-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
Worker 1: 메시지1-1
Worker 1: 메시지1-2
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Exception when invoking
actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]]
with message [die]
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Problem
java.lang.RuntimeException: 고의적 DIE
at tuto3.WorkerActor.onReceive(WorkerActor.java:18) ~[classes/:na]
...
Worker 2: 메시지2-1
Worker 2: 메시지2-2
Worker 2: 메시지2-3
16:43:40.968 [akka:event-driven:dispatcher:global-3] INFO akka.actor.Actor$ - Restarting actor [tuto3.WorkerActor] configured as PERMANENT.
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'preRestart' for failed actor instance [tuto3.WorkerActor].
Worker 1: 재시작 전처리
액터 생성됨: 3
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'postRestart' for new actor instance [tuto3.WorkerActor].
Worker 3: 재시작 후처리
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Restart: true for [tuto3.WorkerActor].
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG a.d.Dispatchers$globalExecutorBasedEventDrivenDispatcher$ - Resuming 46f67fb0-506a-11e0-a0e5-001d92ad4c1a
16:43:40.984 [akka:event-driven:dispatcher:global-4] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-4
Worker 3: 메시지1-3
위 실행 결과를 보면 다음의 사실을 확인할 수 있다.
- 재시작 전처리는 1번 Worker가 수행한다.
- 전처리 후, worker1에 해당하는 새로운 액터 객체를 생성한다. (액터 생성된: 3)
- 재시작 후처리는 3번 Worker가 수행한다.
- 이후 worker1은 3번 Worker와 연결되며, "메시지1-3" 메시지는 3번 Worker가 수행하게 된다.
Supervisor 클래스를 이용한 수퍼바이저 액터 생성
수퍼바이저 액터에서 직접 관리할 액터를 생성하는 경우가 아니면 수퍼바이저 액터를 별도로 구현하기 보다는 Akka가 제공하는 Supervisor 클래스를 이용하는 것이 편리하다.
import akka.actor.SupervisorFactory;
import akka.config.Supervision;
import akka.config.Supervision.OneForOneStrategy;
import akka.config.Supervision.Supervise;
import akka.config.Supervision.SupervisorConfig;
...
ActorRef worker1 = Actors.actorOf(WorkerActor.class);
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
Supervise[] supervises = new Supervise[2];
supervises[0] = new Supervise(worker1, Supervision.permanent());
supervises[1] = new Supervise(worker2, Supervision.permanent());
OneForOneStrategy strategy = new oneForOneStrategy(
new Class[] {RuntimeException.class}, 3, 3000);
SupervisorConfig config = new SupervisorConfig(strategy, supervises);
Supervisor supervisor = new SupervisorFactory(config).newInstance();
// supervisor 생성 시점에서 내부적으로 생성한 SupervisorActor와 worker1가 worker2가 시작됨
worker1.sendOneWay("메시지");
SupervisorFactory를 통해서 Supervisor를 생성하면, 내부적으로 SupervisorActor 타입의 액터를 생성하고, 그 액터에 SupervisorConfig에 지정된 모든 액터를 연결(link)하고, 각 액터를 시작(start) 한다.
내부적으로 생성한 SupervisorActor에 접근하고 싶다면, 다음과 같이 supervisor() 메서드를 사용하면 된다.
ActorRef supervisorActor = supervisor.supervisor();
재시작 횟수 제한
OneForOneStrategy나 AllForOneStrategy를 생성할 때 두 번째/세 번째 파라미터는 각각 제한된 시간 내의 최대 재시작 시도 회수와 제한을 시간 의미한다. 예를 들어, 아래 코드는 1초 이내에 최대 3번의 재시작 시도를 시도한다는 것을 의미한다. (1초 안에 재시작을 3번까지 허용한다는 의미가 아니다.)
1초 안에 액터 재시작을 3번 실패하면 (예를 들어, postRestart() 메서드에서 런타임 예외가 발생해서 실패), 해당 액터에 대해 재시작 시도를 하지 않으며 더 이상 액터를 사용할 수 없게 된다.
액터 생성/연결 한번에 하기
ActorRef는 액터를 생성하고 관리하기 위한 메서드를 제공하고 있으며, 이들 메서드는 다음과 같다.
- ActorRef spawn(Class clazz): 액터를 생성하고 시작한다.
- ActorRef spawnLink(Class clazz): 액터를 생성하고 시작하고, 연결한다.
- ActorRef spawnRemote(Class clazz, String host, int port, long timeout): 리모트 액터를 생성하고 시작한다.
- void startLink(ActorRef actor): 액터를 시작하고 연결한다.
master.start();
ActorRef worker1 = master.spawnLink(WorkerActor.class); // worker1 액터 시작/연결 됨
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
master.startLink(worker2); // worker2 시작/연결 됨
참고자료
Akka 네 번째, TypedActor를 이용한 인터페이스 사용
Akka는 액터에 메시지를 전달하고 응답을 받을 때 자바 인터페이스를 사용할 수 있는 기능을 제공하고 있다. 즉, ActorRef의 sendOneWay()나 sendRequestReply()와 같은 메서드가 아닌 자바 인터페이스에 정의된 메서드를 이용해서 액터에 메시지를 전달하고 응답을 받을 수 있도록 하고 있다. 본 글에서는 Akka가 제공하는 TypedActor를 이용해서 자바 인터페이스를 액터와의 통신 인터페이스로 사용하는 방법을 설명한다.
TypedActor를 이용한 자바 인터페이스 기반 액터 생성
TypedActor 클래스를 사용하면 인터페이스를 구현한 자바 클래스를 액터로 사용할 수 있다. TypedActor를 사용하려면 다음과 같이 인터페이스와 그 인터페이스를 구현한 클래스를 필요로 한다. 이때 인터페이스를 구현한 클래스는 TypedActor 클래스를 상속 받아야 한다.
public void run();
public int restCount();
}
public class DataMigratorImpl extends TypedActor implements DataMigrator {
private int count = 0;
@Override
public void run() {
System.out.println("DataMigratorImpl: 작업 시작");
// 뭔가 작업을 비동기로 처리
}
@Override
public int restCount() {
return 100 - count;
}
}
액터와 통신할 때 사용할 인터페이스를 구현하고 TypedActor를 상속받은 클래스를 구현했다면, 다음의 코드를 이용해서 액터를 생성하고 사용할 수 있다.
TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);
// migrator는 액터와 통신을 위한 프록시
migrator.run();
int rest = migrator.restCount();
do {
Thread.sleep(10);
rest = migrator.restCount();
} while(rest > 0);
TypedActor.stop(migrator); // TypedActor 종료
// Actors.registry().shutdownAll(); 코드도 TypedActor 종료
TypedActor.newInstance()의 첫 번째 파라미터는 액터와 통신할 때 사용할 인터페이스 타입을 지정하며, 두 번째 파라미터는 실제 TypedActor로 사용될 클래스를 지정한다. TypedActor.newInstance() 메서드가 생성한 객체는 액터와 통신을 수행해주는 프록시 객체가 된다. 위 코드에서는 migrator가 프록시 객체가 되는데, 이 프록시 객체의 메서드를 호출하면, 내부적으로 TypedActor 객체에 메시지를 전송하게 되고 TypedActor 객체는 일치하는 메서드를 호출하게 된다.
[참고]
Fire-And-Forget
메서드의 리턴 타입이 void 이면, 해당 메서드에 대한 메시지는 sendOneWay()와 동일하게 Fire-And-Forget 방식으로 전송된다. 따라서, 메서드를 호출하면 액터가 메시지를 처리 여부에 상관없이 즉시 리턴한다. DataMigrator 인터페이스의 run() 메서드가 이에 해당한다.
migrator.run(); // 리턴 타입이 void 이므로 Fire-And-Forget 방식
Send-And-Receive-Eventually
메서드가 리턴 타입을 가지면, sendRequestReply()와 동일하게 Send-And-Receive-Eventually 방식으로 메시지가 전송된다. 따라서, 액터로부터 응답이 도착할 때 까지 블럭킹 된다.
int rest = migrator.restCount(); // Send-and-receive-Eventually 방식
Send-And-Receive-Future
메서드의 리턴 타입이 akka.dispatch.Future이면, Send-And-Receive-Future 방식으로 메서드를 호출한다.
리모트 TypedActor 생성하기
UntypedActor와 마찬가지로 TypedActor도 간단하게 리모트 액터로 제공할 수 있다.
리모트 서버에서 TypedActor 생성하기
리모트 서버에서, 액터를 리모트 액터로 등록하려면 registerTypedActor()를 사용하면 된다.
DataMigrator migrator = TypedActor.newInstance(DataMigrator.class, DataMigratorImpl.class);
Actors.remote().registerTypedActor("data-migrator", migrator);
클라이언트 코드에서는 Actors.remote().typedActorFor() 메서드를 이용해서 리모트 액터에 대한 프록시 객체를 구한 뒤 알맞은 메서드를 호출하면 된다.
.typedActorFor(DataMigrator.class, "data-migrator", "172.20.1.2", 2553);
migrator.run();
int rest = migrator.restCount();
do {
Thread.sleep(1000);
rest = migrator.restCount();
} while(rest > 0);
Actors.remote().shutdown();
[주의]
참고자료
'IT_Programming > Dev Libs & Framework' 카테고리의 다른 글
[POI와 JXL 비교] JAVA에서 엑셀 파일 읽고 쓰기 (0) | 2011.06.20 |
---|---|
[Apache POI using Java] MS word, excel, powerpoint 데이터 핸들링 (0) | 2011.06.20 |
Tlemock을 이용한 Mock 객체 생성 및 사용 (0) | 2011.04.03 |
jMock vs. EasyMock (0) | 2011.04.03 |
NHN에서 공개한 JINDO Parser (0) | 2010.07.25 |