ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 안드로이드 Coroutine Flow - 1 (Flow란?)
    안드로이드 학습/Android 기술면접 대비 2024. 11. 27. 13:48

    안드로이드 App Architecture를 학습하면서 여러 프로젝트를 살펴봤을때 Data Layer(Repository) 부분에서 Coroutine과 함께 flow를 사용하는 하여 데이터를 처리해주는 것을 보았다. 

     

    그래서 이번에 flow가 무엇인지 학습해보고 개인 프로젝트에 적용해보려고 한다.  

    1. Flow란?

    Flow는 Coroutine 라이브러리 내에서 제공하는 비동기 데이터 스트림입니다. Flow는 데이터 스트림을 비동기적으로 처리하게 해주고 또한 연속적인 데이터 흐름을 처리하기에 적합한 도구이다. 

     

    Flow는 Coroutine을 기반으로 빌드되며 여러 값을 제공할 수 있다. Flow는 비동기 적으로 계산되는 데이터 스트림 개념이다. 내보낼 값과 동일한 유형이어야 한다. ( Flow<Int>는 integer value를 내보낸다는 의미.)

     

    데이터 스트림은 다음과 같은 3가지 항목이 있다.

    • 생산자 : 스트림에 추가되는 데이터 생산, 코루틴을 사용해서 Flow에서 비동기적으로 데이터 생산 가능
    • 중개자 : 스트림에 내보내는 값이나 스트림 자체를 수정할 수 있다.
    • 소비자 : 스트림의 값을 사용한다.

    그림 1. 데이터 스트림 관련 항목: 소비자, 중개자(선택사항), 생산자

     

    Case 1) 일반적으로 Repository가 UI 데이터 생산자(Producer)다. 그리고 UI는 소비자(Consumer) 최종적으로 유저에게 표시되는 데이터이다.

     

    Case 2) 때로는 UI Layer가 사용자 입력 이벤트 생산자이고, 다른 layer에서 이를 소비한다.  그리고 중간 계층은 데이터를 변경하여 다음 계층의 요구 사항에 맞게 조정하는 중재자 역할을 한다.

     

    2. flow 사용 방법

    flow를 만들기 위해서는 flow builder API를 사용해야한다. flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있는 새 flow을 만듭니다.

     

    아래의 예제는 develop 사이트에 있는 예제이다. (링크)

     

    2-1) flow 만들기

    class NewsRemoteDataSource(
        private val newsApi: NewsApi,
        private val refreshIntervalMs: Long = 5000
    ) {
        val latestNews: Flow<List<ArticleHeadline>> = flow {
            while(true) {
                val latestNews = newsApi.fetchLatestNews()
                emit(latestNews) // Emits the result of the request to the flow
                delay(refreshIntervalMs) // Suspends the coroutine for some time
            }
        }
    }
    
    // Interface that provides a way to make network requests with suspend functions
    interface NewsApi {
        suspend fun fetchLatestNews(): List<ArticleHeadline>
    }

    NewsRemoteDataSource 생산자(Producer) 역할을 맡고있다. flow는 코루틴 내에서 실행된다. 비동기 API의 이점을 있지만 제한사항도 있다.

    • 생산자가 coroutine 안에 있기 때문에, suspend function을 부를떄, 생산자는 suspend function이 끝날때 까지  멈춰있는다. 위에 예에서는 생산자는 fetchLatestnexs 네트워크 요청이 완료 되고 나서 그 결과를 스트림으로 내보낸다.
    • flow 빌더 안에서, 생산자는 다른 CoroutineContext 값을 emit할 수 없다. 그래서 새 코루틴을 만들거나 withContext 블록을 사용하여 다른 CoroutineContext에서 emit을 호출하지 말아야 한다.  이런 경우 callbackFlow같은 다른 흐름 빌더를 사용할 수 있다. 

    2-2) stream 수정

    중개자는 중간 연산자를 사용해서 값을 소비하지 않고도 데이터 스트림을 수정할 수 있다. (이런 부분은 RxJava와 비슷한 것 같다.)

    class NewsRepository(
        private val newsRemoteDataSource: NewsRemoteDataSource,
        private val userData: UserData
    ) {
        /**
         * Returns the favorite latest news applying transformations on the flow.
         * These operations are lazy and don't trigger the flow. They just transform
         * the current value emitted by the flow at that point in time.
         */
        val favoriteLatestNews: Flow<List<ArticleHeadline>> =
            newsRemoteDataSource.latestNews
                // Intermediate operation to filter the list of favorite topics
                .map { news -> news.filter { userData.isFavoriteTopic(it) } }
                // Intermediate operation to save the latest news in the cache
                .onEach { news -> saveInCache(news) }
    }

     

    2-3) flow에서 collecting

    터미널 연산자(terminal operator)를 사용하여 값 수신 대기를 시작하는 stream을 trigger한다. 

    값이 방출될 때마다 스트림의 모든 값을 가져오려면 collect를 사용한다. 

     

    collect는 suspend 함수이기 때문에 코루틴 내에서 실행되어야 합니다. 이 함수는 람다를 매개변수로 받아, 새로운 값이 방출될 때마다 호출됩니다. 또한 suspend 함수이므로, collect를 호출하는 코루틴은 Flow가 종료될 때까지 일시 중단될 수 있습니다.

    class LatestNewsViewModel(
        private val newsRepository: NewsRepository
    ) : ViewModel() {
    
        init {
            viewModelScope.launch {
                // Trigger the flow and consume its elements using collect
                newsRepository.favoriteLatestNews.collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
            }
        }
    }

     

    Flow를 collect하면 생산자가 활성화되어 최신 뉴스를 새로고침하고, 일정한 간격으로 네트워크 요청의 결과를 방출합니다. 생산자는 while(true) 루프를 통해 항상 활성 상태로 유지되며, ViewModel이 제거되고 viewModelScope가 취소되면 데이터 스트림이 종료됩니다.

     

    Flow 수집이 중지될 수 있는 경우는 다음과 같습니다:

    1. 수집하는 코루틴이 취소된 경우
    2. 생산자가 항목 방출을 완료한 경우

    Flow는 다른 중간 연산자가 지정되지 않는 한 Cold(콜드)하고 Lazy(지연 실행)하다.

    1. Cold (콜드):
      Flow는 콜드 스트림입니다. 즉, Flow를 collect하기 전까지는 데이터가 생성되지 않습니다. Flow가 collect될 때마다 데이터 생산이 시작되며, 그때마다 새로운 데이터를 생성합니다.
    2. Lazy (지연 실행):
      Flow는 Lazy하게 실행됩니다. 즉, Flow의 데이터 방출은 collect가 호출될 때까지 시작되지 않으며, 데이터 방출이 필요할 때만 실행됩니다. 이 특성 덕분에 Flow는 필요할 때만 비동기 작업을 실행하여 자원을 절약합니다.

    2-4) Exception

    ViewModel 부분에서 catch라는 중간 연산자를 사용해 예외 처리가 가능하다.

    class LatestNewsViewModel(
        private val newsRepository: NewsRepository
    ) : ViewModel() {
    
        init {
            viewModelScope.launch {
                newsRepository.favoriteLatestNews
                    // Intermediate catch operator. If an exception is thrown,
                    // catch and update the UI
                    .catch { exception -> notifyError(exception) }
                    .collect { favoriteNews ->
                        // Update View with the latest favorite news
                    }
            }
        }
    }

     

    위와 같이 활용한다면 collect 람다는 호출되지 않는다.

     

    이렇게 활용하는 방법도 있고 emit을 통해 내보내고 collect 람다가 실행되게 하는 방법도 있다. 

    class NewsRepository(...) {
        val favoriteLatestNews: Flow<List<ArticleHeadline>> =
            newsRemoteDataSource.latestNews
                .map { news -> news.filter { userData.isFavoriteTopic(it) } }
                .onEach { news -> saveInCache(news) }
                // If an error happens, emit the last cached values
                .catch { exception -> emit(lastCachedNews()) }
    }

     

    Flow를 처리하는 과정에서 catch를 활용하고 emit으로 내보낸다면 collect 람다를 호출하면서 예외를 활용할 수 있다.

    2-5) 다른 CoroutineContext에서 실행하기

    기본적으로 flow builder의 생산자는 collect 하는 CoroutineContext에서 수행된다. 앞에서 언급한 것처럼 다른 CoroutineContext에서 값을 emit 할 수 없다. 

     

    이것이 필요한 경우라면 flowOn을 사용해야한다. flowOn은 upstream flow의 CoroutineContext를 변경한다. 즉, 생산자 및 중간 연산자가 flowOn 위에 있다면 flowOn의 CoroutineContext가 적용된다. 

    class NewsRepository(
        private val newsRemoteDataSource: NewsRemoteDataSource,
        private val userData: UserData,
        private val defaultDispatcher: CoroutineDispatcher
    ) {
        val favoriteLatestNews: Flow<List<ArticleHeadline>> =
            newsRemoteDataSource.latestNews
                .map { news -> // Executes on the default dispatcher
                    news.filter { userData.isFavoriteTopic(it) }
                }
                .onEach { news -> // Executes on the default dispatcher
                    saveInCache(news)
                }
                // flowOn affects the upstream flow ↑
                .flowOn(defaultDispatcher)
                // the downstream flow ↓ is not affected
                .catch { exception -> // Executes in the consumer's context
                    emit(lastCachedNews())
                }
    }

     

    위에 예에서 보면 map(), onEach() 는 flowOn() 안에 있는 defaultDispatcher를 사용하고, catch 연산자와 소비자는 viewModelScope에 사용되는 Dispatchers.Main에서 실행된다.

     

    DataSource Layer는 I/O 작업을 수행하기 때문에, I/O에 최적화된 Dispatcher를 사용해야 한다. 

    class NewsRemoteDataSource(
        ...,
        private val ioDispatcher: CoroutineDispatcher
    ) {
        val latestNews: Flow<List<ArticleHeadline>> = flow {
            // Executes on the IO dispatcher
            ...
        }
            .flowOn(ioDispatcher)
    }

    Jetpack 라이브러리의 Flow

    Flow는 다수의 Jetpack 라이브러리와 함께 사용 가능하다. Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 적합하다.

     

    Flow와 Room을 사용하여 데이터베이스의 변경을 알림 받을 수 있다. 

    @Dao
    abstract class ExampleDao {
        @Query("SELECT * FROM Example")
        abstract fun getExamples(): Flow<List<Example>>
    }

     

    Example 테이블이 변경될 떄마다 데이터베이스의 새 항목이 포함된 새목록이 내보내진다.

     

    3. Flow 사용의 장단점 :

    장점 :

    • 1. 비동기 데이터 스트림을 쉽게 처리하도로 지원
    • 2. 생산자와 소비자의 속도 차이를 관리하는 Backpressure를 지원
    • 3. 다양한 연산자 지원 (RxJava와 비슷한 측면인듯)
    • 4. collect가 호출되기 전까지 실행되지 않게 하여 필요할 떄만 데이터를 받아올 수 있다. 
    • 5. 에러 및 재시도 로직을 쉽게 처리 하도록 구현되어 있다. 

    단점 :

    • 1. Flow는 스스로 안드로이드 lifecycle 인식을 못한다. 
      • Activity나 Fragment의 생명주기에서 종료가 되면 같이 데이터 수집도 종료되어야 하지만 lifecycle을 인식하지 못해 메모리 누수가 될 수 있다. 
    • 2. Flow는 데이터 소비자가 collect하기 전까지 데이터를 발행하지 않는 Cold Stream 방식이다.
      • 하나의 Flow 빌더에 여러개의 소비자가 collect를 요청하면 하나의 collect 마다 데이터를 호출하기 때문에 비용적으로 비효율적이다. 

    단점 해결:

    • Flow는 스스로 안드로이드 lifecycle 인식을 못한다 (lifecycleScope 사용)
    viewLifecycleOwner.lifecycleScope.launch {
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            launch {
            	... 내용 ...
            }
            
            launch {
                ... 내용 ...
            }
        }
    }
    • Flow는 데이터 소비자가 collect하기 전까지 데이터를 발행하지 않는 Cold Stream 방식이다 (StateFlow 사용) 

     

     

Designed by Tistory.