IT_Programming/Dev Libs & Framework

[펌][RxJava 2.x] Flowable 과 Observable 의 차이

JJun ™ 2017. 3. 30. 16:22



 출처

 : https://01010011.blog/2017/03/29/rxjava-flowable-과-observable-의-차이/





Rxjava 가 메이저 버전 업(1->2)을 하면서 몇 가지 변경점이 생겼다.

변경점에 대한 자세한 내용은 아래 링크를 참조하기 바란다.

Flowable 이라는 base reactive class 가 추가 되었다.
Observable 과의 차이는 backpressure buffer의 기본 탑재 유무이다.

backpressure?

우리말로 번역하면 ‘등 뒤에서 떠밀리는 압박’ 정도가 될 듯 하다.

이런 상황을 가정해보자. 콘서트장을 사람들이 가득 메웠다. 콘서트장에 들어오려는 사람들은 저글링 개떼처럼 밀려드는데 나가는 사람은 별로 없다. 콘서트장 출입구를 통제하는 요원이 없다면? 콘서트장이 터지던지 안에 있던 사람들이 짜부러지던지 아무튼 대형 사고가 발생할거다.

publish / subscribe 모델에서도 이런 비극적인 시나리오가 발생할 수 있다.
생산자는 미친듯이 element 를 생산해 내는데 소비자가 처리하는 속도가 이를 따라가지 못한다면

  1. busy waiting 또는
  2. out of memory exception 이 발생할 것이다.

‘등 뒤에서 떠밀리는 압박’ 에 대한 흐름제어를 위한 버퍼가 바로 backpressure buffer 다.
버퍼가 가득 차면 어차피 소비자는 element 를 처리할 여유가 없는 상태이므로 더 이상 publish 를 하지 않는다.

기존에 없던 개념이 새로 추가된 것은 아니다. 기존 rxJava 1.xx 의 경우 Observable 에 backpressure buffer 를 직접 생성해 주면 사용이 가능하다. 허나 rxJava 개발자는 초보자들이 미처 알아채지 못하는 영역에서 기대하지 않는 동작이 일어날 가능성이 있다며 Flowable 을 추가하였다.

다음 예제코드를 보자. 생산자의 생산 속도를 소비자가 따라가지 못하는 시나리오다.
Flowable 을 사용하면 default buffer size(128) 이상 backpressure buffer 에 element 가 쌓일 경우
흐름제어를 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class example01 {
 
    public static void main(String... args) throws InterruptedException {
 
        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Flowable foo = Flowable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });
 
        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });
 
        while (true) {
            Thread.sleep(1000);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************
 
... 중략 ...
 
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 0*************************************************
receiving id: RxComputationThreadPool-1 1*************************************************
receiving id: RxComputationThreadPool-1 2*************************************************
[very fast sender] i'm fast. very fast.
sending id: main 117**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 118**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 119**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 120**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 121**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 122**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 123**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 124**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 125**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 126**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 127**************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 3*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 4*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 6*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 7*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 8*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 9*************************************************


반면, 같은 시나리오를 Observable 을 backpressure buffer 생성 없이 사용하면
OutOfMemoryException 이 발생한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class example02 {
 
    public static void main(String... args) throws InterruptedException {
 
        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Observable foo = Observable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });
 
        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });
 
        while (true) {
            Thread.sleep(1000);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************
[very fast sender] i'm fast. very fast.
 
...중략...
 
sending id: main 198**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 199**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 200**************************************************
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:3664)
    at java.lang.String.<init>(String.java:207)
    at java.lang.StringBuilder.toString(StringBuilder.java:407)
    at example02.lambda$main$1(example02.java:24)
    at example02$$Lambda$6/123961122.apply(Unknown Source)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
    at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
    at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.Observable.subscribe(Observable.java:10686)
    at io.reactivex.Observable.subscribe(Observable.java:10589)
    at example02.main(example02.java:27)
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************


참고로, Flowable 은 FlowableCreate 라는 builder 에서 생성되며,
특별한 설정이 없을 경우 buffer size 는 최소 16, 기본 128 로 설정한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//FlowableCreate.java line:44
 
    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
 
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
 
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
1
2
3
4
5
//Flowable.java line:61
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
    }