들어가기전에 Flow 문서를 보고 정리한 포스팅이며 1, 3, 4번은 필수로 학습이 필요하다.
-> Flow #1 : Flow builders, operators, context
-> Flow #2 : Buffering
-> Flow #3 : Composing and flattening flows
-> Flow #4 : Exceptions, completions, cancellation
요즘 구글 가이드라인으로 coroutines 이 필수적으로 등장하고 있어서
기존에 서버 API 통신을 위해 사용하던 Rx 를 coroutines Flow 를 사용하는 패턴으로 변경해보려 한다.
기본 구조
- MVVM 구조와 서버 API 통신을 위해 Retrofit2 의 조합을 전제로 해본다.
- 크게 App module 과 서버 통신을 담당하는 Domain module 로 구분되어 있다 (아직은,, 추후 좀 더 clean 하게 data module 도!)
App module |
Domain module |
||
View | ViewModel | Repository | Service |
1. 서버 통신 관련 이벤트 발생 -> 6. 데이터 Observing |
2. 서버 API 데이터 요청 -> <- 5. View 업데이트 |
3. 서버 API 호출 및 응답 반환 -> <- 4. 데이터 응답 반환 |
4. 서버 API 정의 (Retrofit) |
- App module : viewModelScope 를 위한 viewModel-ktx dependency 추가
- Domain module : 서버 통신을 위해 retrofit2, okhttp, coroutines dependency 추가
// App module build.gradle
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"
// Domain module build.gradle
implementation "com.squareup.retrofit2:retrofit:$retrofit_version"
implementation "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version"
implementation "com.squareup.retrofit2:converter-gson:$retrofit_version"
implementation "com.squareup.okhttp3:logging-interceptor:3.14.0"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7"
Rx vs Coroutines
앱 정보를 가져오는 API 를 호출하는 로직을 간단하게 Rx 와 Coroutines 를 비교해본다.
1. AppInfoService.kt
- Coroutines 로 변경하면 suspend function 으로 사용하게되며 리턴 타입을 바로 해당 데이터의 타입으로 받을 수 있다
- 참고로 suspend function 이 실행되면 해당 함수가 끝날 때까지 스레드를 block 시키지 않고 지연시킨다
- 따라서 해당 함수가 호출되자마자 context switching 이 일어나지 않고 지연된 후 특정 시점에 처리되기 때문에 block 에 비해 상대적으로 가벼운 비용이 든다고 한다
// Rx
interface AppInfoService {
@GET("/something/appinfo")
fun getAppInfo(): Observable<AppInfo>
}
// Coroutines
interface AppInfoService {
@GET("/something/appinfo")
suspend fun getAppInfo(): AppInfo
}
2. BaseRepository.kt
- 각 Repository 에서 상속받을 수 있는 abstract class 이다
- API 를 호출 후 데이터 반환받아 처리하는 로직을 포함하는 함수들이 구현되어 있다
- flow 를 사용하여 구현하였으며 아직은 experimental api 로 되어있어 아래와 같은 어노테이션이 옵션이 지정되어있다
- @FlowPreview
- @ExperimentalCoroutinesApi
2-1. Rx
Single API 호출
- 리턴 타입이 Observable 이며 API 실행 후 subscribeOn, observeOn 으로 각 scheduler 를 지정해주었다
- API 에서 HTTP Exception 같은 에러가 발생하면 onErrorResumeNext 로 throwable 을 캐치하여 error observable 을 방출한다
// Rx
protected fun <T> requestAPI(api: Observable<T>,
onSuccess: (data: T?) -> Unit,
onError: (errorModel: ErrorModel) -> Unit): Disposable {
return getDefaultObservable(api).subscribe({ onSuccess(it) }, { onError(createError(it)) })
}
private fun <T> getDefaultObservable(api: Observable<T>): Observable<T> {
return api.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext { throwable: Throwable ->
Observable.error(throwable)
}
}
- 위 Rx 예제와 기본적인 구조를 비슷하게 구현하여 flow 를 활용한 requestAPI 를 구현할 것이다
- 다른점은 API 를 block 으로 받는데, 이유는 뒤에 나올 mutiple API 호출과 관련되어 있다!
Multiple API 호출
- 일단 기본 요구사항은 여러 API 호출 시 하나라도 에러가 발생하면 멈추지 않고 다음 API 를 호출해야한다. 그리고 에러가 발생한 API 는 View 에서 에러 표시만 해주면 된다 라는 전제를 갖고 구현했다
- 위 Single API 호출의 getDefaultObservable() 과 다른점은 onErrorResumeNext 에서 에러 캐치 시 Observable.Error 가 아닌 앞 API 에서 에러가 발생해도 멈추지 않고 다음 API 를 실행하기 위해 Observable.just 로 방출하는 점이다
- requestZipAPI 를 살펴보면 Observables.zip 을 사용하여 여러 API 를 호출하고 있다
- 여기서 단점은 API 수에 따라 requestZipAPI 를 만들어서 사용하여 비효율적인 문제가 발생한다
- flow 로 변경하게 된 핵심 이유가 바로 여러 API 를 sequential 또는 parallel 하게 호출 시 위 단점을 없애서 좀 더 편하게 사용해보기 위해서이다
// Rx
protected fun <T1, T2> requestZipAPI(
api1: Observable<T1>,
api2: Observable<T2>,
onSuccess: (apiResult1: ApiResult, apiResult2: ApiResult) -> Unit,
onError: (errorModel: ErrorModel) -> Unit
): Disposable {
return Observables.zip(getDefaultZipObservableWithError(api1),
getDefaultZipObservableWithError(api2)
).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ (apiResult1, apiResult2) ->
onSuccess(apiResult1, apiResult2)
}, {
onError(createErrorModel(it))
})
}
private fun <T> getDefaultZipObservable(api: Observable<T>): Observable<ApiResult> {
return api.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map<ApiResult> {
ApiResult.Success(it)
}.onErrorResumeNext { throwable: Throwable ->
Observable.just(ApiResult.Error(createErrorModel(throwable)))
}
}
- Rx 에서 구현한 여러 API 호출 당시 parallel 에 대한 니즈가 없어서 sequential 호출만 구현을 했는데, flow 로 변경하면서 두 가지를 구현해본다
2-2. Flow
Single & Sequential API 호출
- flatMapConcat operator 를 사용하여 구현했으며, onResult 에 데이터 반환 시 순서를 위해 withIndex() 를 사용했다
- onResult() 에서 데이터를 List 형태로 반환하는 점이 좀 마음에 걸려 추후 좀 더 개선해볼 예정이다
- flowAPI() 를 살펴보면 catch 에서 다시 emit 을 해주고 있는데, 앞 선 API 에서 에러가 발생해도 다음 API 를 실행하기 위해서다
// Coroutines
private fun <T> flowAPI(api: IndexedValue<suspend () -> T>) =
flowAPI(api.value).map {
it.apply { key = api.index }
}.catch { e ->
emit(createError(e).apply { key = api.index })
}
private fun <T> flowSequentialAPIs(apis: Array<out (suspend () -> T)>) =
apis.asFlow().withIndex().flatMapConcat {
flowAPI(it)
}.flowOn(Dispatchers.IO)
suspend fun <T> requestSequentialAPIs(vararg apis: (suspend () -> T),
onResult: (result: MutableList<ApiResult<T>>) -> Unit) {
val resultList = mutableListOf<ApiResult<T>>()
flowSequentialAPIs(apis).onStart {
// TODO
}.onCompletion {
onResult(resultList)
}.collect {
resultList.add(it)
}
}
Parallel API 호출
- flatMapMerge operator 를 사용하여 먼저 수집되는대로 onResult 에 데이터를 넘겨주고 있다
- 여기서도 index 로 구분하고 있는데 처음에는 데이터 타입으로 구분할까 했지만 여러 API 의 데이터 타입이 다 동일 할 경우를 생각해서 일단은 index 로 구분하였다
// Coroutines
private fun <T> flowParallelAPIs(apis: Array<out (suspend () -> T)>) =
apis.asFlow().withIndex().flatMapMerge {
flowAPI(it)
}.flowOn(Dispatchers.IO)
suspend fun <T> requestParallelAPIs(vararg apis: (suspend () -> T),
onResult: (result: ApiResult<T>) -> Unit) {
flowParallelAPIs(apis).onStart {
// TODO
}.onCompletion {
// TODO
}.collect {
onResult(it)
}
}
3. AppInfoRepository.kt
- 위 BaseRepository 를 상속 받아 사용하는 각 Repository 에서는 아래와 같이 requestAPI 를 호출해주면 된다
// Rx
object AppInfoRepository : BaseRepository() {
fun getAppInfo(onSuccess: (data: AppInfo?) -> Unit,
onError: (error: ApiResult.Error) -> Unit) : Disposable {
return requestAPI(ApiFactory.appInfoService.getAppInfo(), {
onSuccess(it)
}, {
onError(it)
})
}
}
// Coroutines
object AppInfoRepository : BaseRepository() {
suspend fun getAppInfo(onSuccess: (data: AppInfo?) -> Unit,
onError: (error: ApiResult.Error) -> Unit) {
requestAPI({ ApiFactory.appInfoService.getAppInfo() }, onSuccess, onError)
}
}
4. AppInfoViewModel.kt
- Rx 의 경우 disposable 에 추가해주고 BaseViewModel 에서 onCleared() 가 불릴 때 disposable.clear() 를 해주고 있다
- Couroutines 의 경우 viewModelScope 에서 launch 한다
// Rx
disposable += appInfoRepository.getAppInfo(onSuccess = { ... }, onError = { ... })
// Coroutines
viewModelScope.launch {
appInfoRepository.getAppInfo(onSuccess = { ... }, onError = { ... })
}
개선사항
- sequential API 를 combine 으로 해보자
- multiple API 의 onResult() 에서 각 API 의 데이터를 잘 반환 해보자
- onStart 와 onComplete 을 활용하여 loading progress 를 컨트롤 해보자
'기타개발 > Coroutines' 카테고리의 다른 글
Coroutines - Flow #4 : Exceptions, completions, cancellation (0) | 2020.07.27 |
---|---|
Coroutines - Flow #3 : Composing and flattening flows (0) | 2020.07.27 |
Coroutines - Flow #2 : Buffering (0) | 2020.07.21 |
Coroutines - Flow #1 : Flow builders, operators, context (1) | 2020.07.09 |
댓글