ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 안드로이드 비동기 처리 2-2 (RxJava 버전 및 5가지 Base classes)
    안드로이드 학습/Android 기술면접 대비 2024. 8. 29. 17:23

    현재 RxJava는 Version 3.x 까지 나왔다. 아래의 설명은 3.x 버전 기준이다.. 

     

    RxJava 2.x, 1.x에 대한 내용은 RxJava의 github 저장소에 자세히 나와있으므로 참고

    Base classes

    RxJava는 5가지 Base classes 가 있다. 

    Observable, Single, Maybe, Flowable, Completable

    Observer

    1. Observable

    • 설명 : Observable은 데이터의 흐름에 맞게 알림을 보내줘 구독한 Observer이 데이터를 사용할 수 있도록 해준다. 소량의 데이터 스트림을 다룰 때 사용된다. BackPressure는 지원하지 않는다.
    • 특징 :
      • 소량의 데이터 스트림에 적합.
      • BackPressure가 없음.
      • 단순한 데이터 스트림 처리에 사용됨.
      • 안드로이드에서는 EditText에서 text 입력을 실시간으로 감지해서 결과를 보여줄때 사용할 수 있다. 

    코드 예제 :

    더보기

    1부터 5까지 Count하는 것을 보여주는 예제이다. 

     

    1) Observable<Int>

    fun createObservable(): Observable<Int> {
        return Observable
            .create { emitter ->
                try {
                    for (message in 1..5) {
                        if (!emitter.isDisposed) {
                            Thread.sleep(1000) // 1초 지연
                            emitter.onNext(message)
                        }
                    }
                    emitter.onComplete()
    
                } catch (e: Exception) {
                    emitter.onError(e)
                }
            }
    }

     

    2) Observer

    fun createObserver(
        rxStatus1: MutableLiveData<String>,
        rxTimer1: MutableLiveData<String>
    ): Observer<Int> {
        return object : Observer<Int> {
            override fun onSubscribe(d: Disposable) {
                Log.d("Observer", "onSubscribe")
                observableDisposable = d
            }
    
            override fun onNext(t: Int) {
                Log.d("Observer", "onNext : $t")
                rxStatus1.postValue("onNext 진행중입니다.")
                rxTimer1.postValue("Timer : $t")
            }
    
            override fun onError(e: Throwable) {
                Log.d("Observer", "onError")
                rxStatus1.postValue("onError 입니다.")
            }
    
            override fun onComplete() {
                Log.d("Observer", "onComplete")
                rxStatus1.postValue("onComplete에 도달하였습니다..")
            }
        }
    }
    

     

    3) Schedulers 및 구독

    observable
        .subscribeOn(Schedulers.io()) // 백그라운드 작업은 IO 스레드에서
        .observeOn(AndroidSchedulers.mainThread()) // 결과는 메인 스레드에서 관찰
        .subscribe(observer)

    2. Single

    • 설명 : 한개의 데이터 또는 에러를 발행한다. 그래서 네트워크 통신등에 자주 활용된다. 안드로이드에서는 Retrofit 등과 함께 쓰일 수 있다. 
    • 특징 :
      • 오직 한 개의 데이터 또는 에러만 방출.
      • 주로 네트워크 요청이나 데이터베이스 쿼리 결과 등에 사용됨.
      • onSuccess(T item) 또는 onError(Throwable error) 이벤트를 가짐.
      • 사용자가 로그인 할 때 서버로부터 사용자 정보를 가져오는 경우 사용 가능하다. 

    코드 예제 :

    더보기

    1) Single<Int>

    fun createSingle(): Single<Int> {
        return Single
            .create { emitter ->
                try {
                    if (!emitter.isDisposed) {
                        emitter.onSuccess(computation())
                    }
                } catch (e: Exception) {
                    emitter.onError(e)
                }
            }
    }
    private fun computation() : Int{
        var num = 2
        for(a in 0..20){
            num *= 2
        }
        return num
    }

     

    Single 같은 경우 네트워크 통신을 한번만 할때 자주 사용했던 것을 기억한다. 그래서 예제 같은 경우 수학적 계산을 한번 하고 데이터를 방출한다는 개념으로 접근해서 추가 했다. 

     

    2) SingleObserver

    fun createSingleObserver(
        rxStatus2: MutableLiveData<String>
    ): SingleObserver<Int> {
        return object : SingleObserver<Int> {
            override fun onSubscribe(d: Disposable) {
                singleDisposable = d
            }
    
            override fun onError(e: Throwable) {
                rxStatus2.postValue("오류입니다. 이유는 ${e.message}")
            }
    
            override fun onSuccess(t: Int) {
                rxStatus2.postValue("onSuccess 호출되었습니다. 전달받은 데이터는 $t 입니다.")
            }
        }
    }

     

    Observer 와는 다르게 onSuccess()와 onError()만 있어서 한번의 데이터만 방출하거나 에러만 방출

     

    3) Schedulers 및 구독

    single.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(singleObserver)

    3. Maybe

    • 설명 : Maybe는 최대로 발행할 수 있는 데이터가 하나다. 그리고 데이터가 없이 완료될 수도 있습니다
      Single과 다른 점은 onComplete(완료) 이벤트가 추가된다.
    • 특징:
      • 하나의 데이터, 에러, 또는 아무것도 방출하지 않을 수 있음.
      • onSuccess(T item), onComplete(), 또는 onError(Throwable error) 이벤트를 가짐.
      • 로컬 데이터베이스에서 특정 데이터를 조회할 때, 데이터가 없을 수도 있는 경우 사용 가능하다. 

    코드 예제 :

    더보기

    1) Maybe<Int>

    fun createMaybe(): Maybe<Int> {
        return Maybe
            .create { emitter ->
                try {
                    emitter.onSuccess(computation())
                    emitter.onComplete() // Maybe class에서는 onSuccess 호출로 데이터를 방출하면 onComplete는 호출되지 않는다.
                } catch (e: Exception) {
                    emitter.onError(e)
                }
            }
    }
    private fun computation() : Int{
        var num = 2
        for(a in 0..20){
            num *= 2
        }
        return num
    }

     

     2) MaybeObserver

    fun createMaybeObserver(
        rxStatus3: MutableLiveData<String>
    ): MaybeObserver<Int> {
        return object : MaybeObserver<Int> {
            override fun onSubscribe(d: Disposable) {
                maybeDisposable = d
            }
    
            override fun onError(e: Throwable) {}
    
            override fun onSuccess(t: Int) {
                rxStatus3.postValue("onSuccess 호출되었습니다. 전달 받은 데이터는 $t 입니다.")
            }
    
            override fun onComplete() {
                rxStatus3.postValue("onComplete에 도달하였습니다.")
                Log.d("Maybe", "onComplete")
            }
        }
    }

     

    Single과는 다르게 onComplete가 있어서 데이터가 있을 수도 있고 없을 수도 있는 상황에서 데이터가 없다면 onComplete를 활용해서 최정 결과를 나타낼 수 있다.

    onSuccess가 데이터를 방출한다면 onComplete는 방출되지 않는다. 

     

    3) Schedulers 및 구독

    maybe.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(maybeObserver)

    4. Completable

    • 설명 : Completable은 데이터를 방출하지 않고, 완료 또는 에러 신호만 방출합니다. 주로 작업의 완료 여부만 필요한 경우에 사용됩니다.
    • 특징:
      • 데이터 방출 없음.
      • 작업의 성공 또는 실패만 알림.
      • onComplete() 또는 onError(Throwable error) 이벤트를 가짐.
      • 사용자의 프로필 정보를 데이터베이스에 저장하는 경우

    코드 예제 :

    더보기

    1) Completable

    fun createCompletable(): Completable {
        return Completable.create { emitter ->
            try {
                if (!emitter.isDisposed) {
                    if(saveDataInLocal("Toms")){
                        emitter.onComplete() // 1번만 호출되고 더이상 호출되지 않는다.
                    }
                }
            } catch (e: Exception) {
                emitter.onError(e)
            }
        }
    }
    private fun saveDataInLocal(username: String) : Boolean{
        val nameList = mutableListOf("Tom", "John")
        if(nameList.contains(username)){
            throw SQLiteConstraintException("Username '$username' already exists in the database.")
        }
        nameList.add(username)
        return true
    }

     

    예제에서는 실제 LocalDB를 사용하지는 않았지만 비슷한 느낌을 주려고 했다. 

    List 안에 들어있는 Tom이나 John이 방출되면 Error로 가고 

    다른 이름이 들어가면 onComplete가 호출된다.

     

    2) CompletableObserver

    fun createCompletableObserver(
        rxStatus4: MutableLiveData<String>
    ): CompletableObserver {
        return object : CompletableObserver {
            override fun onSubscribe(d: Disposable) {
                completableDisposable = d
            }
    
            override fun onError(e: Throwable) {
                rxStatus4.postValue("onError 호출 되었습니다.")
            }
    
            override fun onComplete() {
                rxStatus4.postValue("onComplete 호출 되었습니다. 데이터가 정상적으로 처리되었습니다.")
                Log.d("Completable", "onComplete")
            }
        }
    }

     

    3) Schedulers 및 구독

    completable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(completableObserver)

    5. Flowable

    • 설명 : Flowable은 많은 양의 데이터 스트림을 다룰 때 사용되며, BackPressure 전략이 제공된다. 소비자가 생성된 데이터를 충분히 빠르게 처리하지 못할 때 발생하는 문제를 관리하는 메커니즘이다.
    • 특징
      • 대량의 데이터를 다룰 수 있다.
      • BackPressure를 처리 할 수 있다. 
      • 데이터 흐름을 조절할 수 있다. 

    5-1) BackPressure(배압) 이란?

    BackPressure는 데이터의 발행과 소비의 균형이 어긋날 때 발생한다. 

     

    예를 들어 Observable 데이터 스트림이 10만개의 데이터를 0.1초마다 계속 발행하고 Observer가 10초에 한 번씩만 소비한다면, 구독자가 소비를 다 못하는 상황에서 데이터 스트림에 계속해서 쌓이게 된다.이런식으로 데이터를 쌓는다면 결국 메모리가 OverFlow되서 OutOfMemoryException에러가 발생할 수 있다.  이것을 BackPressure라고 하는 것이다. 이때 Obserable 대신 Flowable을 사용해야한다.

    Flowable.create({ emitter ->
    	... 생략 ...
    }, BackpressureStrategy.BUFFER)

     

    5-2) 5가지 BackpressureStrategy

     

    위에 처럼 Flowable을 만들면서 Backpressure를 어떤 것을 사용할지 정해야한다. Backpressure 전략은 5가지가 있다. 

     

    • BUFFER
      • 모든 데이터를 버퍼에 저장하여, 소비자가 데이터를 처리할 준비가 될 때까지 기다린다. 버퍼의 크기는 기본적으로 제한이 없어 메모리 부족 위험이 있을 수 있으므로 주의가 필요하다.
      • 버퍼가 너무 커질 경우 메모리 문제를 야기할 수 있으므로, 메모리 사용을 신중히 모니터링해야 합니다.
    • DROP
      • 소비자가 처리할 수 없는 데이터는 모두 삭제(drop)됩니다.
      • 최신 데이터를 유지하면서 오래된 데이터를 무시해야 할 때 유용합니다.
    • LATEST
      • 최신 데이터만 유지하고 이전 데이터를 덮어씁니다.
      • 소비자가 최신 상태만 필요로 할 때 유용합니다. 예를 들어, 사용자 위치 업데이트와 같은 상황에서 사용됩니다.
    • MISSING
      • 특별한 전략을 적용하지 않고 Flowable 사용자가 직접 백프레셔 처리를 해야 합니다.
      • 사용자에게 맞춤형 백프레셔 처리를 맡기므로, 백프레셔 관련 예외가 발생할 수 있습니다.
    • ERROR
      • 소비자 처리 속도가 따라가지 못할 경우 MissingBackpressureException을 발생시킵니다.
      • 모든 데이터를 처리할 수 있어야 하는 상황에서 사용하며, 백프레셔 문제 발생 시 오류로 처리합니다.

     

    5-3) Observable vs Flowable

     

    Observable 사용 권장 상황

    • 1,000개 미만의 데이터가 발행되는 경우
    • GUI 프로그래밍 시 (마우스 이벤트, 터치 이벤트 등 → debounce() 등으로 제어 가능)
    • Java Stream API 를 사용하지 않는 경우

    Flowable 사용 권장 상황

    • 데이터가 10,000개 이상 처리되는 경우
    • 디스크에서 파일을 읽어들이는 경우
    • JDBC 를 통해 DB 쿼리 결과를 가져오는 경우
    • 네트워크 I/O 작업을 하는 경우
    • 다수의, 혹은 Non-Blocking 방식의 API 를 요청하는 경우

    코드 예제 :

    더보기

    1) Flowable

    fun createFlowable(): Flowable<Int> {
        return Flowable.create({ emitter ->
            try {
                for (message in 1..5) {
                    if (!emitter.isCancelled) {
                        Thread.sleep(1000) // 1초 지연
                        emitter.onNext(message)
                    }
                }
                emitter.onComplete()
            } catch (e: Exception) {
                emitter.onError(e)
            }
        }, BackpressureStrategy.BUFFER)
    }
    

     

    onNext를 사용해서 데이터를 5번 방출 후에 onComplete 수행. 

     

    2) DisposableSubscriber

    fun createDisposableSubscriber(
        rxStatus1: MutableLiveData<String>,
        rxTimer1: MutableLiveData<String>
    ): DisposableSubscriber<Int> {
        return object : DisposableSubscriber<Int>() {
            override fun onNext(t: Int) {
                rxStatus1.postValue("onNext 진행중입니다.")
                rxTimer1.postValue("Timer : $t")            }
    
            override fun onError(t: Throwable) {
                rxStatus1.postValue("onError 도달하였습니다. error : ${t.message}")
            }
    
            override fun onComplete() {
                rxStatus1.postValue("onComplete에 도달하였습니다.")
            }
    
        }
    }

     

    3) Schedulers 및 구독

    val observer = flowableClass.createDisposableSubscriber(_rxStatus5, _rxTimer5)
    val flowable = flowableClass.createFlowable()
    flowable
        .subscribeOn(Schedulers.io()) // 백그라운드 작업은 IO 스레드에서
        .observeOn(AndroidSchedulers.mainThread()) // 결과는 메인 스레드에서 관찰
        .subscribe(observer)
Designed by Tistory.