-
안드로이드 비동기 처리 2-2 (RxJava 버전 및 5가지 Base classes)안드로이드 학습/Android 기술면접 대비 2024. 8. 29. 17:23
현재 RxJava는 Version 3.x 까지 나왔다. 아래의 설명은 3.x 버전 기준이다..
RxJava 2.x, 1.x에 대한 내용은 RxJava의 github 저장소에 자세히 나와있으므로 참고
Base classes
RxJava는 5가지 Base classes 가 있다.
Observable, Single, Maybe, Flowable, Completable
Observer 1. Observable
- 설명 : Observable은 데이터의 흐름에 맞게 알림을 보내줘 구독한 Observer이 데이터를 사용할 수 있도록 해준다. 소량의 데이터 스트림을 다룰 때 사용된다. BackPressure는 지원하지 않는다.
- 특징 :
- 소량의 데이터 스트림에 적합.
- BackPressure가 없음.
- 단순한 데이터 스트림 처리에 사용됨.
- 안드로이드에서는 EditText에서 text 입력을 실시간으로 감지해서 결과를 보여줄때 사용할 수 있다.
코드 예제 :
더보기1부터 5까지 Count하는 것을 보여주는 예제이다.
1) Observable<Int>
fun createObservable(): Observable<Int> { return Observable .create { emitter -> try { for (message in 1..5) { if (!emitter.isDisposed) { Thread.sleep(1000) // 1초 지연 emitter.onNext(message) } } emitter.onComplete() } catch (e: Exception) { emitter.onError(e) } } }
2) Observer
fun createObserver( rxStatus1: MutableLiveData<String>, rxTimer1: MutableLiveData<String> ): Observer<Int> { return object : Observer<Int> { override fun onSubscribe(d: Disposable) { Log.d("Observer", "onSubscribe") observableDisposable = d } override fun onNext(t: Int) { Log.d("Observer", "onNext : $t") rxStatus1.postValue("onNext 진행중입니다.") rxTimer1.postValue("Timer : $t") } override fun onError(e: Throwable) { Log.d("Observer", "onError") rxStatus1.postValue("onError 입니다.") } override fun onComplete() { Log.d("Observer", "onComplete") rxStatus1.postValue("onComplete에 도달하였습니다..") } } }
3) Schedulers 및 구독
observable .subscribeOn(Schedulers.io()) // 백그라운드 작업은 IO 스레드에서 .observeOn(AndroidSchedulers.mainThread()) // 결과는 메인 스레드에서 관찰 .subscribe(observer)
2. Single
- 설명 : 한개의 데이터 또는 에러를 발행한다. 그래서 네트워크 통신등에 자주 활용된다. 안드로이드에서는 Retrofit 등과 함께 쓰일 수 있다.
- 특징 :
- 오직 한 개의 데이터 또는 에러만 방출.
- 주로 네트워크 요청이나 데이터베이스 쿼리 결과 등에 사용됨.
- onSuccess(T item) 또는 onError(Throwable error) 이벤트를 가짐.
- 사용자가 로그인 할 때 서버로부터 사용자 정보를 가져오는 경우 사용 가능하다.
코드 예제 :
더보기1) Single<Int>
fun createSingle(): Single<Int> { return Single .create { emitter -> try { if (!emitter.isDisposed) { emitter.onSuccess(computation()) } } catch (e: Exception) { emitter.onError(e) } } }
private fun computation() : Int{ var num = 2 for(a in 0..20){ num *= 2 } return num }
2) SingleObserver
fun createSingleObserver( rxStatus2: MutableLiveData<String> ): SingleObserver<Int> { return object : SingleObserver<Int> { override fun onSubscribe(d: Disposable) { singleDisposable = d } override fun onError(e: Throwable) { rxStatus2.postValue("오류입니다. 이유는 ${e.message}") } override fun onSuccess(t: Int) { rxStatus2.postValue("onSuccess 호출되었습니다. 전달받은 데이터는 $t 입니다.") } } }
Observer 와는 다르게 onSuccess()와 onError()만 있어서 한번의 데이터만 방출하거나 에러만 방출
3) Schedulers 및 구독
single.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(singleObserver)
3. Maybe
- 설명 : Maybe는 최대로 발행할 수 있는 데이터가 하나다. 그리고 데이터가 없이 완료될 수도 있습니다
Single과 다른 점은 onComplete(완료) 이벤트가 추가된다. - 특징:
- 하나의 데이터, 에러, 또는 아무것도 방출하지 않을 수 있음.
- onSuccess(T item), onComplete(), 또는 onError(Throwable error) 이벤트를 가짐.
- 로컬 데이터베이스에서 특정 데이터를 조회할 때, 데이터가 없을 수도 있는 경우 사용 가능하다.
코드 예제 :
더보기1) Maybe<Int>
fun createMaybe(): Maybe<Int> { return Maybe .create { emitter -> try { emitter.onSuccess(computation()) emitter.onComplete() // Maybe class에서는 onSuccess 호출로 데이터를 방출하면 onComplete는 호출되지 않는다. } catch (e: Exception) { emitter.onError(e) } } }
private fun computation() : Int{ var num = 2 for(a in 0..20){ num *= 2 } return num }
2) MaybeObserver
fun createMaybeObserver( rxStatus3: MutableLiveData<String> ): MaybeObserver<Int> { return object : MaybeObserver<Int> { override fun onSubscribe(d: Disposable) { maybeDisposable = d } override fun onError(e: Throwable) {} override fun onSuccess(t: Int) { rxStatus3.postValue("onSuccess 호출되었습니다. 전달 받은 데이터는 $t 입니다.") } override fun onComplete() { rxStatus3.postValue("onComplete에 도달하였습니다.") Log.d("Maybe", "onComplete") } } }
Single과는 다르게 onComplete가 있어서 데이터가 있을 수도 있고 없을 수도 있는 상황에서 데이터가 없다면 onComplete를 활용해서 최정 결과를 나타낼 수 있다.
onSuccess가 데이터를 방출한다면 onComplete는 방출되지 않는다.
3) Schedulers 및 구독
maybe.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(maybeObserver)
4. Completable
- 설명 : Completable은 데이터를 방출하지 않고, 완료 또는 에러 신호만 방출합니다. 주로 작업의 완료 여부만 필요한 경우에 사용됩니다.
- 특징:
- 데이터 방출 없음.
- 작업의 성공 또는 실패만 알림.
- onComplete() 또는 onError(Throwable error) 이벤트를 가짐.
- 사용자의 프로필 정보를 데이터베이스에 저장하는 경우
코드 예제 :
더보기1) Completable
fun createCompletable(): Completable { return Completable.create { emitter -> try { if (!emitter.isDisposed) { if(saveDataInLocal("Toms")){ emitter.onComplete() // 1번만 호출되고 더이상 호출되지 않는다. } } } catch (e: Exception) { emitter.onError(e) } } }
private fun saveDataInLocal(username: String) : Boolean{ val nameList = mutableListOf("Tom", "John") if(nameList.contains(username)){ throw SQLiteConstraintException("Username '$username' already exists in the database.") } nameList.add(username) return true }
예제에서는 실제 LocalDB를 사용하지는 않았지만 비슷한 느낌을 주려고 했다.
List 안에 들어있는 Tom이나 John이 방출되면 Error로 가고
다른 이름이 들어가면 onComplete가 호출된다.
2) CompletableObserver
fun createCompletableObserver( rxStatus4: MutableLiveData<String> ): CompletableObserver { return object : CompletableObserver { override fun onSubscribe(d: Disposable) { completableDisposable = d } override fun onError(e: Throwable) { rxStatus4.postValue("onError 호출 되었습니다.") } override fun onComplete() { rxStatus4.postValue("onComplete 호출 되었습니다. 데이터가 정상적으로 처리되었습니다.") Log.d("Completable", "onComplete") } } }
3) Schedulers 및 구독
completable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(completableObserver)
5. Flowable
- 설명 : Flowable은 많은 양의 데이터 스트림을 다룰 때 사용되며, BackPressure 전략이 제공된다. 소비자가 생성된 데이터를 충분히 빠르게 처리하지 못할 때 발생하는 문제를 관리하는 메커니즘이다.
- 특징 :
- 대량의 데이터를 다룰 수 있다.
- BackPressure를 처리 할 수 있다.
- 데이터 흐름을 조절할 수 있다.
5-1) BackPressure(배압) 이란?
BackPressure는 데이터의 발행과 소비의 균형이 어긋날 때 발생한다.
예를 들어 Observable 데이터 스트림이 10만개의 데이터를 0.1초마다 계속 발행하고 Observer가 10초에 한 번씩만 소비한다면, 구독자가 소비를 다 못하는 상황에서 데이터 스트림에 계속해서 쌓이게 된다.이런식으로 데이터를 쌓는다면 결국 메모리가 OverFlow되서 OutOfMemoryException에러가 발생할 수 있다. 이것을 BackPressure라고 하는 것이다. 이때 Obserable 대신 Flowable을 사용해야한다.
Flowable.create({ emitter -> ... 생략 ... }, BackpressureStrategy.BUFFER)
5-2) 5가지 BackpressureStrategy
위에 처럼 Flowable을 만들면서 Backpressure를 어떤 것을 사용할지 정해야한다. Backpressure 전략은 5가지가 있다.
- BUFFER
- 모든 데이터를 버퍼에 저장하여, 소비자가 데이터를 처리할 준비가 될 때까지 기다린다. 버퍼의 크기는 기본적으로 제한이 없어 메모리 부족 위험이 있을 수 있으므로 주의가 필요하다.
- 버퍼가 너무 커질 경우 메모리 문제를 야기할 수 있으므로, 메모리 사용을 신중히 모니터링해야 합니다.
- DROP
- 소비자가 처리할 수 없는 데이터는 모두 삭제(drop)됩니다.
- 최신 데이터를 유지하면서 오래된 데이터를 무시해야 할 때 유용합니다.
- LATEST
- 최신 데이터만 유지하고 이전 데이터를 덮어씁니다.
- 소비자가 최신 상태만 필요로 할 때 유용합니다. 예를 들어, 사용자 위치 업데이트와 같은 상황에서 사용됩니다.
- MISSING
- 특별한 전략을 적용하지 않고 Flowable 사용자가 직접 백프레셔 처리를 해야 합니다.
- 사용자에게 맞춤형 백프레셔 처리를 맡기므로, 백프레셔 관련 예외가 발생할 수 있습니다.
- ERROR
- 소비자 처리 속도가 따라가지 못할 경우 MissingBackpressureException을 발생시킵니다.
- 모든 데이터를 처리할 수 있어야 하는 상황에서 사용하며, 백프레셔 문제 발생 시 오류로 처리합니다.
5-3) Observable vs Flowable
Observable 사용 권장 상황
- 1,000개 미만의 데이터가 발행되는 경우
- GUI 프로그래밍 시 (마우스 이벤트, 터치 이벤트 등 → debounce() 등으로 제어 가능)
- Java Stream API 를 사용하지 않는 경우
Flowable 사용 권장 상황
- 데이터가 10,000개 이상 처리되는 경우
- 디스크에서 파일을 읽어들이는 경우
- JDBC 를 통해 DB 쿼리 결과를 가져오는 경우
- 네트워크 I/O 작업을 하는 경우
- 다수의, 혹은 Non-Blocking 방식의 API 를 요청하는 경우
코드 예제 :
더보기1) Flowable
fun createFlowable(): Flowable<Int> { return Flowable.create({ emitter -> try { for (message in 1..5) { if (!emitter.isCancelled) { Thread.sleep(1000) // 1초 지연 emitter.onNext(message) } } emitter.onComplete() } catch (e: Exception) { emitter.onError(e) } }, BackpressureStrategy.BUFFER) }
onNext를 사용해서 데이터를 5번 방출 후에 onComplete 수행.
2) DisposableSubscriber
fun createDisposableSubscriber( rxStatus1: MutableLiveData<String>, rxTimer1: MutableLiveData<String> ): DisposableSubscriber<Int> { return object : DisposableSubscriber<Int>() { override fun onNext(t: Int) { rxStatus1.postValue("onNext 진행중입니다.") rxTimer1.postValue("Timer : $t") } override fun onError(t: Throwable) { rxStatus1.postValue("onError 도달하였습니다. error : ${t.message}") } override fun onComplete() { rxStatus1.postValue("onComplete에 도달하였습니다.") } } }
3) Schedulers 및 구독
val observer = flowableClass.createDisposableSubscriber(_rxStatus5, _rxTimer5) val flowable = flowableClass.createFlowable() flowable .subscribeOn(Schedulers.io()) // 백그라운드 작업은 IO 스레드에서 .observeOn(AndroidSchedulers.mainThread()) // 결과는 메인 스레드에서 관찰 .subscribe(observer)
'안드로이드 학습 > Android 기술면접 대비' 카테고리의 다른 글
LiveData (0) 2024.09.23 안드로이드 비동기 처리 2-3 (Operators : 연산자들) (0) 2024.08.29 안드로이드 Context (0) 2024.08.22 Handler와 Looper (0) 2024.08.20 Serializable, Parcelable (Intent) 그리고 Parcelize (0) 2024.08.08