본문 바로가기
기타개발/Coroutines

Android Networking with coroutines Flow

by 궝테스트 2020. 8. 6.

들어가기전에 Flow 문서를 보고 정리한 포스팅이며 1, 3, 4번은 필수로 학습이 필요하다.
-> Flow #1 : Flow builders, operators, context
-> Flow #2 : Buffering
-> Flow #3 : Composing and flattening flows
-> Flow #4 : Exceptions, completions, cancellation

요즘 구글 가이드라인으로 coroutines 이 필수적으로 등장하고 있어서
기존에 서버 API 통신을 위해 사용하던 Rx 를 coroutines Flow 를 사용하는 패턴으로 변경해보려 한다.

기본 구조

  • MVVM 구조와 서버 API 통신을 위해 Retrofit2 의 조합을 전제로 해본다.
  • 크게 App module 과 서버 통신을 담당하는 Domain module 로 구분되어 있다 (아직은,, 추후 좀 더 clean 하게 data module 도!)
App module
Domain module
View ViewModel Repository Service
1. 서버 통신 관련 이벤트 발생 ->

6. 데이터 Observing
2. 서버 API 데이터 요청 ->

<- 5. View 업데이트
3. 서버 API 호출 및 응답 반환 ->

<- 4. 데이터 응답 반환
4. 서버 API 정의 (Retrofit)


  • App module : viewModelScope 를 위한 viewModel-ktx dependency 추가
  • Domain module : 서버 통신을 위해 retrofit2, okhttp, coroutines dependency 추가
// App module build.gradle
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"


// Domain module build.gradle
implementation "com.squareup.retrofit2:retrofit:$retrofit_version"
implementation "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version"
implementation "com.squareup.retrofit2:converter-gson:$retrofit_version"
implementation "com.squareup.okhttp3:logging-interceptor:3.14.0"

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7"

 

Rx vs Coroutines

앱 정보를 가져오는 API 를 호출하는 로직을 간단하게 Rx 와 Coroutines 를 비교해본다.

1. AppInfoService.kt

  • Coroutines 로 변경하면 suspend function 으로 사용하게되며 리턴 타입을 바로 해당 데이터의 타입으로 받을 수 있다
  • 참고로 suspend function 이 실행되면 해당 함수가 끝날 때까지 스레드를 block 시키지 않고 지연시킨다
  • 따라서 해당 함수가 호출되자마자 context switching 이 일어나지 않고 지연된 후 특정 시점에 처리되기 때문에 block 에 비해 상대적으로 가벼운 비용이 든다고 한다
// Rx
interface AppInfoService {

    @GET("/something/appinfo")
    fun getAppInfo(): Observable<AppInfo>

}

// Coroutines
interface AppInfoService {

    @GET("/something/appinfo")
    suspend fun getAppInfo(): AppInfo

}

 

2. BaseRepository.kt

  • 각 Repository 에서 상속받을 수 있는 abstract class 이다
  • API 를 호출 후 데이터 반환받아 처리하는 로직을 포함하는 함수들이 구현되어 있다
  • flow 를 사용하여 구현하였으며 아직은 experimental api 로 되어있어 아래와 같은 어노테이션이 옵션이 지정되어있다
    • @FlowPreview
    • @ExperimentalCoroutinesApi

2-1. Rx

Single API 호출

  • 리턴 타입이 Observable 이며 API 실행 후 subscribeOn, observeOn 으로 각 scheduler 를 지정해주었다
  • API 에서 HTTP Exception 같은 에러가 발생하면 onErrorResumeNext 로 throwable 을 캐치하여 error observable 을 방출한다
// Rx
protected fun <T> requestAPI(api: Observable<T>,
                             onSuccess: (data: T?) -> Unit,
                             onError: (errorModel: ErrorModel) -> Unit): Disposable {
	return getDefaultObservable(api).subscribe({ onSuccess(it) }, { onError(createError(it)) })
}

private fun <T> getDefaultObservable(api: Observable<T>): Observable<T> {
	return api.subscribeOn(Schedulers.io())
    		  .observeOn(AndroidSchedulers.mainThread())
              .onErrorResumeNext { throwable: Throwable ->
              		Observable.error(throwable)
              }
}
  • 위 Rx 예제와 기본적인 구조를 비슷하게 구현하여 flow 를 활용한 requestAPI 를 구현할 것이다
  • 다른점은 API 를 block 으로 받는데, 이유는 뒤에 나올 mutiple API 호출과 관련되어 있다!

Multiple API 호출

  • 일단 기본 요구사항은 여러 API 호출 시 하나라도 에러가 발생하면 멈추지 않고 다음 API 를 호출해야한다. 그리고 에러가 발생한 API 는 View 에서 에러 표시만 해주면 된다 라는 전제를 갖고 구현했다
  • 위 Single API 호출의 getDefaultObservable() 과 다른점은 onErrorResumeNext 에서 에러 캐치 시 Observable.Error 가 아닌 앞 API 에서 에러가 발생해도 멈추지 않고 다음 API 를 실행하기 위해 Observable.just 로 방출하는 점이다
  • requestZipAPI 를 살펴보면 Observables.zip 을 사용하여 여러 API 를 호출하고 있다
  • 여기서 단점은 API 수에 따라 requestZipAPI 를 만들어서 사용하여 비효율적인 문제가 발생한다
  • flow 로 변경하게 된 핵심 이유가 바로 여러 API 를 sequential 또는 parallel 하게 호출 시 위 단점을 없애서 좀 더 편하게 사용해보기 위해서이다
// Rx
protected fun <T1, T2> requestZipAPI(
        api1: Observable<T1>,
        api2: Observable<T2>,
        onSuccess: (apiResult1: ApiResult, apiResult2: ApiResult) -> Unit,
        onError: (errorModel: ErrorModel) -> Unit
    ): Disposable {
    	return Observables.zip(getDefaultZipObservableWithError(api1),
            getDefaultZipObservableWithError(api2)
        ).subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe({ (apiResult1, apiResult2) ->
            onSuccess(apiResult1, apiResult2)
         }, {
            onError(createErrorModel(it))
         })
}
    
private fun <T> getDefaultZipObservable(api: Observable<T>): Observable<ApiResult> {
	return api.subscribeOn(Schedulers.io())
    		  .observeOn(AndroidSchedulers.mainThread())
              .map<ApiResult> {
              		ApiResult.Success(it)
              }.onErrorResumeNext { throwable: Throwable ->
              		Observable.just(ApiResult.Error(createErrorModel(throwable)))
              }
}
  • Rx 에서 구현한 여러 API 호출 당시 parallel 에 대한 니즈가 없어서 sequential 호출만 구현을 했는데, flow 로 변경하면서 두 가지를 구현해본다

2-2. Flow

Single & Sequential API 호출

  • flatMapConcat operator 를 사용하여 구현했으며, onResult 에 데이터 반환 시 순서를 위해 withIndex() 를 사용했다
  • onResult() 에서 데이터를 List 형태로 반환하는 점이 좀 마음에 걸려 추후 좀 더 개선해볼 예정이다
  • flowAPI() 를 살펴보면 catch 에서 다시 emit 을 해주고 있는데, 앞 선 API 에서 에러가 발생해도 다음 API 를 실행하기 위해서다
// Coroutines
private fun <T> flowAPI(api: IndexedValue<suspend () -> T>) = 
	flowAPI(api.value).map {
		it.apply { key = api.index }
	}.catch { e ->
		emit(createError(e).apply { key = api.index })
	}

private fun <T> flowSequentialAPIs(apis: Array<out (suspend () -> T)>) = 
	apis.asFlow().withIndex().flatMapConcat {
		flowAPI(it)
	}.flowOn(Dispatchers.IO)
    
suspend fun <T> requestSequentialAPIs(vararg apis: (suspend () -> T), 
					onResult: (result: MutableList<ApiResult<T>>) -> Unit) {
	val resultList = mutableListOf<ApiResult<T>>()
    flowSequentialAPIs(apis).onStart {
		// TODO
	}.onCompletion {
		onResult(resultList)
	}.collect {
		resultList.add(it)
	}
}


Parallel API 호출

  • flatMapMerge operator 를 사용하여 먼저 수집되는대로 onResult 에 데이터를 넘겨주고 있다
  • 여기서도 index 로 구분하고 있는데 처음에는 데이터 타입으로 구분할까 했지만 여러 API 의 데이터 타입이 다 동일 할 경우를 생각해서 일단은 index 로 구분하였다
// Coroutines
private fun <T> flowParallelAPIs(apis: Array<out (suspend () -> T)>) = 
	apis.asFlow().withIndex().flatMapMerge {
		flowAPI(it)
	}.flowOn(Dispatchers.IO)

suspend fun <T> requestParallelAPIs(vararg apis: (suspend () -> T), 
					onResult: (result: ApiResult<T>) -> Unit) {
	flowParallelAPIs(apis).onStart {
		// TODO
	}.onCompletion {
		// TODO
	}.collect {
		onResult(it)
	}
}


3. AppInfoRepository.kt

  • 위 BaseRepository 를 상속 받아 사용하는 각 Repository 에서는 아래와 같이 requestAPI 를 호출해주면 된다
// Rx
object AppInfoRepository : BaseRepository() {

    fun getAppInfo(onSuccess: (data: AppInfo?) -> Unit, 
                   onError: (error: ApiResult.Error) -> Unit) : Disposable {
        return requestAPI(ApiFactory.appInfoService.getAppInfo(), {
        	onSuccess(it)
        }, {
        	onError(it)
        })
    }

}

// Coroutines
object AppInfoRepository : BaseRepository() {

    suspend fun getAppInfo(onSuccess: (data: AppInfo?) -> Unit, 
                           onError: (error: ApiResult.Error) -> Unit) {
        requestAPI({ ApiFactory.appInfoService.getAppInfo() }, onSuccess, onError)
    }

}


4.  AppInfoViewModel.kt

  • Rx 의 경우 disposable 에 추가해주고 BaseViewModel 에서 onCleared() 가 불릴 때 disposable.clear() 를 해주고 있다
  • Couroutines 의 경우 viewModelScope 에서 launch 한다
// Rx
disposable += appInfoRepository.getAppInfo(onSuccess = { ... }, onError = { ... })

// Coroutines
viewModelScope.launch {
	appInfoRepository.getAppInfo(onSuccess = { ... }, onError = { ... })
}

 

개선사항

  • sequential API 를 combine 으로 해보자
  • multiple API 의 onResult() 에서 각 API 의 데이터를 잘 반환 해보자
  • onStart 와 onComplete 을 활용하여 loading progress 를 컨트롤 해보자

댓글