본문 바로가기

안드로이드

안드로이드 RxJava #8 RxJava 연산자-(결합)

 

안녕하세요

이번 시간에는 결합하는 연산자에 대해 알아보겠습니다.

 

여러 Observable 들은 결합 하여 하나의 Observable을 생성하는 연산자입니다.

 

 

 

combineLatest

	Observable<Integer> intSource = Observable.create((emitter -> {
            new Thread(() -> {
                for (int i = 1; i <= 5; i++) {
                    emitter.onNext(i);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }));

        Observable<String> strSource = Observable.create(emitter -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                    emitter.onNext("A");
                    Thread.sleep(700);
                    emitter.onNext("B");
                    Thread.sleep(100);
                    emitter.onNext("C");
                    Thread.sleep(700);
                    emitter.onNext("D");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        });
        Observable.combineLatest(intSource, strSource, (num, str) -> num + str)
                .subscribe(System.out::println);

 

두 개의 Observable에서 가장 최근에 발행한 아이템을 취합하여 하나로 발행합니다.

실무에서 많이 사용되는 연산자라고 합니다. 여러 개의 http요청에 의한 응답을 하나로 묶어서 처리할 때 사용된다고 합니다.

 

다음과 같은 결과를 볼 수 있다.

2022-11-10 17:01:14.104 18869-18904/com.psw.rxtest4 I/System.out: 1A
2022-11-10 17:01:14.585 18869-18903/com.psw.rxtest4 I/System.out: 2A
2022-11-10 17:01:14.807 18869-18904/com.psw.rxtest4 I/System.out: 2B
2022-11-10 17:01:14.908 18869-18904/com.psw.rxtest4 I/System.out: 2C
2022-11-10 17:01:15.586 18869-18903/com.psw.rxtest4 I/System.out: 3C
2022-11-10 17:01:15.610 18869-18904/com.psw.rxtest4 I/System.out: 3D
2022-11-10 17:01:16.589 18869-18903/com.psw.rxtest4 I/System.out: 4D
2022-11-10 17:01:17.592 18869-18903/com.psw.rxtest4 I/System.out: 5D

 

 

 

zip

combineLatest 코드 아래 부분에 다음과 같은 내용을 추가해 줍니다. 5초 후에 zip을 이용해 Observable을 결합할 겁니다.

	try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Observable.zip(intSource, strSource, (num, str) -> num + str)
                .subscribe(System.out::println);

결과

2022-11-10 17:17:54.912 19083-19121/com.psw.rxtest4 I/System.out: 1A
2022-11-10 17:17:55.615 19083-19121/com.psw.rxtest4 I/System.out: 2B
2022-11-10 17:17:56.417 19083-19120/com.psw.rxtest4 I/System.out: 3C
2022-11-10 17:17:57.420 19083-19120/com.psw.rxtest4 I/System.out: 4D

같은 Observable인데 zip을 사용하면 다른 결과를 보여줍니다.

combineLatest과 비슷 하지만 zip은 1대1 매칭입니다. 순서를 엄격히 지켜 아이템을 결합하는 것을 확인할 수 있습니다.

 

 

 

merge

merge를 시작하기 전에 intervalRange에 대해 알아보겠습니다

생성하는 연산자에서 배운 interval과 range를 합친 겁니다.

	Observable src1 = Observable.intervalRange(
                1			//시작 값
                ,5			//발행 횟수
                ,3000			//초기 지연시간
                ,1000			//발행 간격
                ,TimeUnit.MILLISECONDS)
                .map(v->v*20);
        src1.subscribe(System.out::println);

1부터 시작하고 5번 발행하며 3초 동안 초기 지연 이후 1초의 간격으로 아이템을 발행합니다.

이후 map을 이용하여 발행된 값에 20을 곱해주었습니다.

 

 

 

	Observable src1 = Observable.intervalRange(
                1,5,0,100,TimeUnit.MILLISECONDS)
                .map(v->v*20);

        Observable src2 = Observable.create(emitter -> {
            new Thread(()->{
                try{
                    Thread.sleep(250);
                    emitter.onNext("추가1");
                    Thread.sleep(200);
                    emitter.onNext("추가2");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        });
        Observable.merge(src1, src2)
                .subscribe(System.out::println);

merge 연산자를 이용하면 여러 Observable을 하나의 Observale처럼 결합하여 사용할 수 있습니다. 아이템이 발행되는 시점을 기준으로 하나의 Observable을 만듭니다.

 

결과

2022-11-10 17:56:18.747 20410-20451/com.psw.rxtest4 I/System.out: 20
2022-11-10 17:56:18.847 20410-20451/com.psw.rxtest4 I/System.out: 40
2022-11-10 17:56:18.947 20410-20451/com.psw.rxtest4 I/System.out: 60
2022-11-10 17:56:19.001 20410-20452/com.psw.rxtest4 I/System.out: 추가1
2022-11-10 17:56:19.047 20410-20451/com.psw.rxtest4 I/System.out: 80
2022-11-10 17:56:19.146 20410-20451/com.psw.rxtest4 I/System.out: 100
2022-11-10 17:56:19.202 20410-20452/com.psw.rxtest4 I/System.out: 추가2

 

 

 

다음 시간에는 스케줄러에 대해 알아보겠습니다.

수고하셨습니다!

반응형