
안녕하세요
이번 시간에는 결합하는 연산자에 대해 알아보겠습니다.
여러 Observable 들은 결합 하여 하나의 Observable을 생성하는 연산자입니다.
combineLatest

Observable<Integer> intSource = Observable.create((emitter -> {
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
emitter.onNext(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}));
Observable<String> strSource = Observable.create(emitter -> {
new Thread(() -> {
try {
Thread.sleep(500);
emitter.onNext("A");
Thread.sleep(700);
emitter.onNext("B");
Thread.sleep(100);
emitter.onNext("C");
Thread.sleep(700);
emitter.onNext("D");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
});
Observable.combineLatest(intSource, strSource, (num, str) -> num + str)
.subscribe(System.out::println);
두 개의 Observable에서 가장 최근에 발행한 아이템을 취합하여 하나로 발행합니다.
실무에서 많이 사용되는 연산자라고 합니다. 여러 개의 http요청에 의한 응답을 하나로 묶어서 처리할 때 사용된다고 합니다.
다음과 같은 결과를 볼 수 있다.
2022-11-10 17:01:14.104 18869-18904/com.psw.rxtest4 I/System.out: 1A
2022-11-10 17:01:14.585 18869-18903/com.psw.rxtest4 I/System.out: 2A
2022-11-10 17:01:14.807 18869-18904/com.psw.rxtest4 I/System.out: 2B
2022-11-10 17:01:14.908 18869-18904/com.psw.rxtest4 I/System.out: 2C
2022-11-10 17:01:15.586 18869-18903/com.psw.rxtest4 I/System.out: 3C
2022-11-10 17:01:15.610 18869-18904/com.psw.rxtest4 I/System.out: 3D
2022-11-10 17:01:16.589 18869-18903/com.psw.rxtest4 I/System.out: 4D
2022-11-10 17:01:17.592 18869-18903/com.psw.rxtest4 I/System.out: 5D
zip

combineLatest 코드 아래 부분에 다음과 같은 내용을 추가해 줍니다. 5초 후에 zip을 이용해 Observable을 결합할 겁니다.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.zip(intSource, strSource, (num, str) -> num + str)
.subscribe(System.out::println);
결과
2022-11-10 17:17:54.912 19083-19121/com.psw.rxtest4 I/System.out: 1A
2022-11-10 17:17:55.615 19083-19121/com.psw.rxtest4 I/System.out: 2B
2022-11-10 17:17:56.417 19083-19120/com.psw.rxtest4 I/System.out: 3C
2022-11-10 17:17:57.420 19083-19120/com.psw.rxtest4 I/System.out: 4D
같은 Observable인데 zip을 사용하면 다른 결과를 보여줍니다.
combineLatest과 비슷 하지만 zip은 1대1 매칭입니다. 순서를 엄격히 지켜 아이템을 결합하는 것을 확인할 수 있습니다.
merge
merge를 시작하기 전에 intervalRange에 대해 알아보겠습니다
생성하는 연산자에서 배운 interval과 range를 합친 겁니다.
Observable src1 = Observable.intervalRange(
1 //시작 값
,5 //발행 횟수
,3000 //초기 지연시간
,1000 //발행 간격
,TimeUnit.MILLISECONDS)
.map(v->v*20);
src1.subscribe(System.out::println);
1부터 시작하고 5번 발행하며 3초 동안 초기 지연 이후 1초의 간격으로 아이템을 발행합니다.
이후 map을 이용하여 발행된 값에 20을 곱해주었습니다.

Observable src1 = Observable.intervalRange(
1,5,0,100,TimeUnit.MILLISECONDS)
.map(v->v*20);
Observable src2 = Observable.create(emitter -> {
new Thread(()->{
try{
Thread.sleep(250);
emitter.onNext("추가1");
Thread.sleep(200);
emitter.onNext("추가2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
});
Observable.merge(src1, src2)
.subscribe(System.out::println);
merge 연산자를 이용하면 여러 Observable을 하나의 Observale처럼 결합하여 사용할 수 있습니다. 아이템이 발행되는 시점을 기준으로 하나의 Observable을 만듭니다.
결과
2022-11-10 17:56:18.747 20410-20451/com.psw.rxtest4 I/System.out: 20
2022-11-10 17:56:18.847 20410-20451/com.psw.rxtest4 I/System.out: 40
2022-11-10 17:56:18.947 20410-20451/com.psw.rxtest4 I/System.out: 60
2022-11-10 17:56:19.001 20410-20452/com.psw.rxtest4 I/System.out: 추가1
2022-11-10 17:56:19.047 20410-20451/com.psw.rxtest4 I/System.out: 80
2022-11-10 17:56:19.146 20410-20451/com.psw.rxtest4 I/System.out: 100
2022-11-10 17:56:19.202 20410-20452/com.psw.rxtest4 I/System.out: 추가2
다음 시간에는 스케줄러에 대해 알아보겠습니다.
수고하셨습니다!
'안드로이드' 카테고리의 다른 글
안드로이드 Retrofit 통신#2 리사이클러뷰 (0) | 2022.11.20 |
---|---|
안드로이드 Retrofit 통신 사용방법 (0) | 2022.11.19 |
안드로이드 RxJava #7 RxJava 연산자-(필터링) (0) | 2022.11.09 |
안드로이드 RxJava #6 RxJava 연산자-(변형) (0) | 2022.11.08 |
안드로이드 RxJava #5 마블 다이어그램, RxJava 연산자-(생성) (0) | 2022.11.07 |