IT_Programming/Dev Libs & Framework

Rx (Reactive Extensions)

JJun ™ 2015. 6. 29. 11:58



 출처: http://functionalthinking.appspot.com/sp.search?q=Rx

 참고자료

 1) http://tiii.tistory.com/15

 2) http://tiii.tistory.com/16

 3) http://tiii.tistory.com/17

 4) http://tiii.tistory.com/18

 5) http://tiii.tistory.com/21



Exact match

Pages starting with "Rx"

Pages ending with "Rx"

Pages containing "Rx"




Reactive Extensions. 에릭 마이어(Erik Meijer)가 만든 Reactive Programming 라이브러리.

Rx = Observables + LINQ + Schedulers.

중요한 건, Observable과 Scheduler! LINQ부분은 호스트 언어에 따라 달라진다. 스칼라의 for-yield,하스켈의 List Comprehension이 여기에 해당.

아래 발표 중 Bart De Smet의 것이 소개로 좋은 것 같다. 조금 어수선하기는 하지만 데모 내용도 훌륭하고 요약도 좋다. Observable을 사용할 때와 아닌 경우를 대비하여 설명하는 방식이라 이해하기에 좋고, 장점을 잘 부각시킨 것 같다.

홈페이지 #

발표 #

Incoming Links #

Related Blog Postings #

Related People #

Related Articles #

Suggested Pages #




[Rx]를 안드로이드에서 사용하기.

  • Java 대신(혹은 함께) Kotlin/Scala/Groovy 사용하기
  • Java8 + Retrolambda 사용하기

AsyncTask usage in EventLocationAdapter #

AsyncTask 는 Future 처럼 동작한다. doInBackground()에서 작업한 결과를 onPostExecute()에서 받는 대신 get()으로 가져올 수 있다. (그런데 왜 Future 인터페이스를 구현하지 않았을까?)

http://androidxref.com/5.1.0_r1/xref/packages/apps/Calendar/src/com/android/calendar/event/EventLocationAdapter.java#278

// Start the recent locations query (async).
AsyncTask<Void, Void, List<Result>> locationsQueryTask =
        new AsyncTask<Void, Void, List<Result>>() {
    @Override
    protected List<Result> doInBackground(Void... params) {
        return queryRecentLocations(mResolver, filter);
    }
}.execute(); 
...
try {
    // Wait for the locations query.
    List<Result> recentLocations = locationsQueryTask.get();
    ...
} catch (ExecutionException e) {
    Log.e(TAG, "Failed waiting for locations query results.", e);
} catch (InterruptedException e) {
    Log.e(TAG, "Failed waiting for locations query results.", e);
}

위 예제는 AsyncTask.execute()가 UI thread에서 실행되어야 한다는 원칙("execute(Params...) must be invoked on the UI thread.")을 위배하고 있다. 그런데 문제없이 동작하는 이유는?

Filter클래스는 performFiltering()을 내부적인 HandlerThread에서 실행한다. 따라서 AsyncTask가 실행될 조건(getLooper())을 만족한다.

AsyncTask.SERIAL_EXECUTOR 글로벌 Executor를 이용하여 Backgound task를 실행시키고, Filter의 Request처리용 HandlerThread에서 onPre/PostExecute()를 무임승차한다.

이득은, 이미 잘 정의된 Global executor를 사용하여 쉽게 Background task를 실행할 수 있게 되었다는 점.

ArrayAdapter / ArrayFilter #

ArrayAdapter.getFilter()가 던져주는 ArrayFilter는 lazy로 mOriginalValues를 만들고, 이후 mObjects대신 사용된다. 이후 mObjects는 실제 화면에 보여질 데이터(filter.filter("...")를 호출한 결과)를 담게된다. filter.filter(...)는 Filter에 정의된 메쏘드이며, Filter클래스는 Filter 생성시 filtering request를 처리할 HandlerThread -> RequestHandler와 (UI Thread) ResultHandler를 만들어놓는다. filter는 비동기적으로 performFiltering()에서 실제 filtering을 진행하고, 결과를 UI Thread에서 publishResult()호출로 UI에 반영한다. (notifyDataSetChanged()) ArrayFilter는 startsWith() 및 space 로 나눠서 startsWith 를 호출하는 식으로 filtering한다. filtering된 결과를 publishResult()에서 mObjects에 적용하여 갱신된다. (데이터 셋이 크다면 filtering이 오래 걸리지 않을까?)

EventLocationAdapter #

ArrayAdapter를 상속하고 있지만 대부분의 메쏘드를 Override한다. getFilter()도 Override. EventLocationAdapter는 EditEventView.mLocationAdapter에 만들어지고, EditEventView.mLocationTextView(AutoCompleteTextView)에 연결된다. getFilter()는 AutoCompleteTextView에서 사용한다.

AutoCompleteTextView #

AutoCompleteTextView는 연결되는 Adapter에 대해 generic이 적용되어 있으며, 마치 타입클래스처럼 두가지 Interface를 만족하는 타입에 대해서만 받을 수 있게 제한되어 있다.

http://androidxref.com/5.1.0_r1/xref/frameworks/base/core/java/android/widget/AutoCompleteTextView.java#629

public <T extends ListAdapter & Filterable> void setAdapter(T adapter) {
    if (mObserver == null) {
        mObserver = new PopupDataSetObserver(this);
    } else if (mAdapter != null) {
        mAdapter.unregisterDataSetObserver(mObserver);
    }
    mAdapter = adapter;
    if (mAdapter != null) {
        //noinspection unchecked
        mFilter = ((Filterable) mAdapter).getFilter();
        adapter.registerDataSetObserver(mObserver);
    } else {
        mFilter = null;
    }
    mPopup.setAdapter(mAdapter);
}

mPopup은 ListPopupWindow이며, PopupDataSetObsever를 전달받은 adapter에 observer로 등록한다. 또한 mPopup의 Adapter로 사용한다. (ListAdapter이기 때문에 가능하다) observer는 onChanged에서 show, onInvalidated에서 dismiss하는 역할.

AutoCompleteTextView는 FilterListener를 구현하여 filter(.., this)로 통지받는다.

Filter.filter는 background thread에서 performFiltering을 호출하고 그 결과를 전달하여 publishResult를 호출한다. 마지막으로 UI Thread에서 FilterListener.onFilterComplete를 호출한다.

ArrayAdapter.ArrayFilter는 publishResult에서 notifyDataSetChanged를 호출한다. EventLocationAdapter.LocationFilter의 publishResult도 마찬가지로 notifyDataSet...를 호출한다.

PopupDataSetObserver는 DataSetChanged/Invalidated 통지를 받아 show/dismiss를 실행한다.

AutoCompleteTextView의 onFilterComplete에서는 show/dismiss를 다시 결정한다.

EventLocationAdpater.LocationFilter #

performFiltering에서 하는일. 두가지 Query 결과를 합쳐서 반환한다. performFiltering자체는 동기적으로 결과를 반환해야 한다.

  • queryRecentLocation (async) - like "filter%" 검색. Events.ID desc 순으로 검색하여(가장 최근에 추가된 장소 우선) post processing하면서 중복을 제거하고 최대갯수만큼만 선택하여 알파벳 순으로 반환. (List<Result>)
  • queryContacts (sync) - formatted address, display name에 대해 각각 like "filter%" or like "%filter%" 를 적용하여 contact에서 검색.. display name asc 순으로 검색.
    • 일단 이름이 있는 주소는 모두 따로 모은다. (out parameter HashSet<Addr> - 중복 자동 제거)
    • 이름 하나에 여러 address가 있을 수 있고, 이들을 이름 기준으로 grouping. 첫 주소에 대해서만 icon, name, address, photoUri를 저장하고 나머지는 주소만 담아둔다.
    • grouping된 주소들의 Result(name, icon, address, uri)들을 모두 하나로 flatten시켜 반환. (List<Result>, 여기서는 중복 제거 안됨)
  • 두 결과를 합친다.
    • recent 검색 결과(List<Result>) 중, contact 주소(HashSet.contains)에 없는 것들 먼저 최종 결과에 추가
    • contact 검색 결과(List<Result>) 를 최종 결과에 추가.
  • 소요시간 출력.

코드 참고.

@Override
protected FilterResults performFiltering(CharSequence constraint) {
    long startTime = System.currentTimeMillis();
    final String filter = constraint == null ? "" : constraint.toString();
    if (filter.isEmpty()) {
        return null;
    }
    // Start the recent locations query (async).
    AsyncTask<Void, Void, List<Result>> locationsQueryTask =
            new AsyncTask<Void, Void, List<Result>>() {
        @Override
        protected List<Result> doInBackground(Void... params) {
            return queryRecentLocations(mResolver, filter);
        }
    }.execute();
    // Perform the contacts query (sync).
    HashSet<String> contactsAddresses = new HashSet<String>();
    List<Result> contacts = queryContacts(mResolver, filter, contactsAddresses);
    ArrayList<Result> resultList = new ArrayList<Result>();
    try {
        // Wait for the locations query.
        List<Result> recentLocations = locationsQueryTask.get();
        // Add the matched recent locations to returned results.  If a match exists in
        // both the recent locations query and the contacts addresses, only display it
        // as a contacts match.
        for (Result recentLocation : recentLocations) {
            if (recentLocation.mAddress != null &&
                    !contactsAddresses.contains(recentLocation.mAddress)) {
                resultList.add(recentLocation);
            }
        }
    } catch (ExecutionException e) {
        Log.e(TAG, "Failed waiting for locations query results.", e);
    } catch (InterruptedException e) {
        Log.e(TAG, "Failed waiting for locations query results.", e);
    }
    // Add all the contacts matches to returned results.
    if (contacts != null) {
        resultList.addAll(contacts);
    }
    // Log the processing duration.
    if (Log.isLoggable(TAG, Log.DEBUG)) {
        long duration = System.currentTimeMillis() - startTime;
        StringBuilder msg = new StringBuilder();
        msg.append("Autocomplete of ").append(constraint);
        msg.append(": location query match took ").append(duration).append("ms ");
        msg.append("(").append(resultList.size()).append(" results)");
        Log.d(TAG, msg.toString());
    }
    final FilterResults filterResults = new FilterResults();
    filterResults.values = resultList;
    filterResults.count = resultList.size();
    return filterResults;
}



Rx를 소개할 때 괜찮은 예제들. 아래의 예제 1과 예제 2는 각각 Rx의 "Composability + Concurrency" 강점을 잘 보여준다.

예제 1 - 파일 네 개 합치기 #

간단한 유틸리티 만드는 중에 네 개로 나뉘어 저장된 파일을 하나로 합칠 일이 생겼다. 파일 데이터A.txt에는 다음의 내용이 들어있다.

 id1 데이터A-1
 id2 데이터A-2
 id3 데이터A-3
 ...

데이터B.txt데이터C.txt데이터D.txt는 각각 같은 형식이지만 데이터 내용만 다르다. 네 개의 파일을 하나로 합쳐서 다음처럼 출력하고 싶다.

id1 데이터A-1 데이터B-1 데이터C-1 데이터D-1
id2 데이터A-2 데이터B-2 데이터C-2 데이터D-2
id3 데이터A-3 데이터B-3 데이터C-3 데이터D-3
...

전혀 어려운 문제는 아니지만 여러모로 귀찮은 일이다. 아래 코드는 RxJava의 스칼라 래퍼를 이용하여 작성한 것이다.

def fromFile(f: File): rx.lang.scala.Observable[String] =
  toScalaObservable(rx.Observable.from(asJavaIterable(Source.fromFile(f).getLines.toIterable)))
val a = fromFile("dataA.txt")
val b = fromFile("dataB.txt").map(line => line.split("\t")(1))
val c = fromFile("dataC.txt").map(line => line.split("\t")(1))
val d = fromFile("dataD.txt").map(line => line.split("\t")(1))
val p = new PrintWriter(new File("output.txt"))
Observable.zip(a, b, c, d).subscribe(
  tuple => p.println(tuple.productIterator.mkString("\t")
  error => println("error: " + error),
  () => { p.close })

map과 zip은 대표적인 Observable 오퍼레이터이다. 특히 zip은 n개의 Observable을 순서대로동기화하여 묶어서 처리 함수에 전달해준다.




참고 #

구글링해보니 이와 유사한 문제에 대한 질답이 있다.

파일 두 개를 어떻게 줄 단위로 합칠 수 있을지에 대한 질답: BufferedReader.getLine 을 이용하여 읽어들이고 출력한다. 별로 훌륭하지 않은 답변들이지만, 자바가 처한 상황을 보여주는 것이라 생각된다.

여러 개의 반복자(iterator)를 함께 순회하는 방법에 대한 질답: zip이나 Pair등의 아이디어들이 나오기도 하지만 while이 가장 간단하게 보인다.

텍스트 파일을 줄 단위로 읽어들이는 방법에 대한 질답: 아파치IO의 FileUtils로 반복자를 얻어오는 방법을 알려준다. 하나만 읽어들인다면 BufferedReader.getLine도 나쁘지는 않다.

예제 2 - Retrofit (REST client) #

Retrofit은 REST API를 사용할 때 편리한 라이브러리다. URL패턴을 지정해주면, HTTP 요청을 보내고 이에 대한 응답을 받아서 사용자 데이터로 변환하여 넘겨주기까지의 과정을 자동으로 처리해준다. Retrofit에서는 URL패턴을 지정할 때 서비스 API를 다음 중 하나로 지정할 수 있다.

  • 동기식 API
  • 비동기식 API - with Callback
  • 비동기식 API - with Observable

하나씩 살펴보자. 다음은 가장 간단한 '동기식 API'로 URL패턴을 지정한 경우다.

// 옵션 1 - 동기식 API
interface UserService {
  @GET("/user/{id}/photo")
  Photo getUserPhoto(@Path("id") int id);
}

위처럼 UserService 인터페이스를 정의하면 구현부를 Retrofit라이브러리가 만들어준다. 그래서 다음처럼 인스턴스를 만들어서 REST API를 호출하는데 사용할 수 있다. 다음은 1번 사용자의 사진을 얻어오는 예제 코드.

RestAdapter restAdapter = new RestAdapter.Builder()
  .setServer("https://api.user.com")
  .build();
UserService service = restAdapter.create(UserService.class);
Photo photo = service.getUserPhoto(1); // blocking, may throw Exception

Photo가 무엇이고 어떻게 생겼는지는 모르겠지만 getUserPhoto(id)를 호출하면 된다는 것은 알 수 있다. 이 API는 간단하고 사용하기 편리하기는 하지만 치명적인 단점이 있다. Blocking call이라는 점이다. 이 때문에 Retrofit은 다음과 같은 형식도 지원한다.

// 옵션 2 - 비동기식 API with Callback
interface UserService {
  @GET("/user/{id}/photo")
  void getUserPhoto(@Path("id") int id, Callback<Photo> cb);
}

Retrofit은 위와 같이 비동기식 API로 지정된 경우 요청/응답 처리 결과값을 Callback으로 전달해준다. 이제 getUserPhoto(id)는 블록되지 않는다. 그러나 새로운 문제가 생겼다. 이런 류의 Async API가 더 많아지면 프로그램의 흐름이 복잡하게 꼬일 수 있다는 점이다. 예를 들어, Photo는 실제 이미지의 URL만을 담고 있어서 결과에 대해 또다시 HTTP요청을 보내서 Bitmap을 가져와야 하며, 이 결과를 Thumbnail이미지로 만드는 것도 Async API로 제공된다면? 혹은 사용자와 관련된 다른 서너 가지 정보를 다른 경로의 비동기 호출로 가져와서 사용자 데이터가 모두 준비되었을 때 최종 처리를 해야한다든지 하는 경우에는? 이런 경우 콜백 헬 Callback Hell에 빠지게 된다.

Retrofit이 제공하는 세번째 옵션을 취하면 Blocking call/Callback hell 문제로부터 어느정도 해방될 수 있다. Retrofit은 RxJava의 Observable 반환타입을 지원한다. 서비스 인터페이스를 다음과 같이 정의하면 된다.

// 옵션 3 - 비동기식 API with Observable
interface UserService {
  @GET("/user/{id}/photo")
  Observable<Photo> getUserPhoto(@Path("id") int id);
}

이제 getUserPhoto(id) 호출은 비동기로 동작할 뿐만아니라 다른 Observable들과의 합성도 가능해졌다. (예제 1과 같은 map/zip 등의 오퍼레이터를 사용할 수 있다는 얘기.)

Observable<Thumbnail> user = service.getUserPhoto(id)
  .map(new Func1<Photo, Bitmap>() {
    @Override Bitmap call(Photo photo) {
      return getBitmapFromUrl(photo.url); }})
  .map(new Func1<Bitmap, Thumbnail>() {
    @Override Thumbnail call(Bitmap bitmap) {
      return makeThumbnail(bitmap); }});

참고 - Callback Hell과 Promise #

"콜백 헬" 문제는 자바스크립트에서 이미 많이 나온 문제다. 1) 브라우저의 멀티쓰레딩 지원이 약해서 2) 함수 리터럴로 콜백을 만들기 쉬워서 자바스크립트에서는 콜백을 많이 사용했고, 이때문에 콜백 헬은 이미 많이 알려진 문제다. (이를 피하는 몇가지 기본 팁) 이를 해결하기 위한 추상화 객체가 Promise다. Future라고도 부른다. (위키피디어: Futures and Promises) Promise의 중요한 속성을 'Composition'이라고 지적한 글(링크)도 있다. Rx의 Observable은 Future나 Promise의 '여러 개' 버전이므로 함께 읽어두면 좋을 것 같다.

예제 3 - 레거시 코드에 Observable 접목하기 #

예제 2는 하나의 문제에 대해 세가지 방식의 API를 비교해 볼 수 있는 좋은 사례이다. 또한 레거시 코드에 어떻게 Rx를 적용할 수 있을지 힌트를 주기도 한다. 만약 우리가 다뤄야 하는 라이브러리나 코드가 동기식 API나 Callback 스타일의 비동기식만 지원한다면 어떻게 해야 할까? Rx는 간단한 브릿지 코드를 통해 쉽게 Observable을 만들 수 있게 한다.

동기식 API가 있으면 아주 쉽게 변환할 수 있다. Observable.create를 이용하면 된다. 특히 스칼라 버전에는 observable이라는 유틸리티도 제공된다.

val o: Observable[Photo] = observable { getUserPhoto(id) }

만약 기존 라이브러리가 콜백 형식의 API만 지원한다면 어떨까? 역시 크게(?) 어렵지 않다. 래퍼 함수를 만들어서 콜백을 등록하고 성공했을 때 onNext만 호출해주면 된다.

def getUserPhoto(id:Int): Observable[Photo] = Observable.create(observer => {
  getUserPhoto(id, new Callback[Photo]() {
    override def success(photo: Photo, res: Response) {
      observer.onNext(photo)
      observer.onCompleted()
    }
    override def failure(error: RetrofitError) {
      observer.onError(error)
    }
    Subscriptions.empty
  }
}

요약 #

Rx의 Observable은 함수형 오퍼레이터로 다양하게 합성할 수 있는 장점(예제 1)과, 이 과정에서 동시성 문제를 우아하게 처리할 수 있다는 장점(예제 2)이 있다. 또한, 레거시 코드가 이를 고려하여 만들어지지 않았더라도 비교적 쉽게 기존의 이벤트 스트림을 Observable 시퀀스로 변환할 수 있어(예제 3) 그 이득을 취할 수 있다.

Suggested Pages #




RxJava

A library for composing asynchronous and event-based programs using observable sequences for the Java VM

마이크로소프트의 Reactive Extensions(Rx)을 Java로 포팅한 것. 넷플릭스의 Ben Christensen이라는 친구가 주도적인 역할을 하였다. 지금은 GitHub에 공개되어 있다.

소개 동영상 #

간단 정리 #

인상적인 점 #

이직이 꽃피운 RxJava #

Ben Christensen은 발표에서 본인(을 포함한 팀 동료들)이 Imperative programmer였지만 Discovery API를 개선하려는 노력 중에 마이크로소프트에서 온 친구(Jafar Husain을 언급한 모양이다)가 Rx를 소개해줬고 그 덕분에 RxJava로 개선 방향을 잡았다고 전한다. 자바 프로그래머들이 정말 잘 안쳐다볼만한 곳이 마이크로소프트쪽인 것을 감안하면 흥미롭다.

Java Concurrency in Practice같은 좋은 책이 있지만 이 책을 모두 읽는 것은 아니고 이 책의 교훈을 실천하기도 어렵기 때문에 API 수준에서 동시성 문제를 추상화하기로 한 결정도 훌륭했다고 본다.

Owner of API should retain control of concurrency behavior - Ben Christensen

사용자를 먼저 고려 #

넷플릭스 Discovery API의 트래픽을 살펴보니 99% 정도는 내부적으로 사용되고 있음을 확인하고 과감하게 클라이언트 영역의 로직을 서버에서 한꺼번에 처리하도록 개선하여 네트웍 지연을 김소시켰다고 한다. 예를 들어 특정 검색어 영화목록을 보여주기 위해 검색(1)한 결과를 토대로 목록의 영화들에 대해 상세 정보를 조회(2)하고 개인의 북마크 상황(3)을 가져와서... 이런식으로 클라이언트가 여러번에 걸쳐 서버와 통신하던 것을 서버에서 필요한 유스케이스를 처리하여 결과를 모아서 전달하는 식이다. 잘 정의된 기본 API들을 조합하여 클라이언트에서 시나리오를 구현하는 방법이 더 매력적으로 보일 수 있지만 결과적으로 지연 시간을 줄여 사용자가 더 쾌적하게 사용할 수 있는 방법을 우선시 했다는 점이 인상적이다.

25% 지연 감소 ...

See Also #

Reactive REST

Incoming Links #

Related Articles #

Suggested Pages #