본문 바로가기
기타개발/Coroutines

Coroutines - Flow #4 : Exceptions, completions, cancellation

by 궝테스트 2020. 7. 27.

https://kotlinlang.org/docs/reference/coroutines/flow.html

 

Asynchronous Flow - Kotlin Programming Language

 

kotlinlang.org

 

1. Flow exceptions

연산자 내부의 emitter 또는 코드에서 예외가 발생하면 flow collection 이 예외로 완료 할 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            


/**
 * 결과
 *
 * Emitting 1
 * 1
 * Emitting 2
 * 2
 * Caught java.lang.IllegalStateException: Collected 2
 */


Everything is caught

  • try-catch 문이 collect 를 감싸고 있기 때문에 방출뿐만 아니라 중간 operator 에서 발생한 예외도 잡아낼 수 있다
fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            


/**
 * 결과
 *
 * Emitting 1
 * string 1
 * Emitting 2
 * Caught java.lang.IllegalStateException: Crashed on 2
 */

 

 

2. Exception transparency

  • emitter 에 의해 발생한 예외 처리를 위한 캡슐화가 필요하다
  • flow 는 예외에 투명해야하며 try-catch 블록 내부에서 flow {...} 빌더의 값을 방출하는 것은 예외 투명성을 위반하는 것이다
  • collector 에서 예외가 발생할 경우 이전 예제처럼 try-catch를 사용하여 잡을 수 있고, emitter 는 예외 투명성을 유지하기 위해 예외 처리를 캡슐화 할 수있는 catch 연산자를 사용한다
    • catch 연산자 내부에는 예외를 대응하는 코드를 넣으면 되며 throw를 사용하여 예외를 다시 발생시킬 수 있다
    • catch 내부에는 emit 을 사용하여 예외를 다른 값의 방출로 바꿀 수 있다
    • 다른 코드로 예외를 무시, 로그 출력 등을 할 수 있다

Transparent catch

  • Exeception transparency 를 고려한 catch 중간 연산자는 업스트림 예외만 잡는다 (즉, catch 위의 모든 연산자에서 예외는 아님)
  • 아래 예제처럼 collect {...} (catch 아래에 있음) 의 블록에서 예외가 발생하면 catch {} 를 타지 않고 이스케이프된다
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            


Catching declaratively

  • 아래 예제처럼 onEach 에서 예외를 발생 시키는 코드를 넣고 catch 연산자 앞에 두면 예외 발생 시 catch {} 를 타게된다
simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

 

3. Flow Completion

  • flow collect 가 완전히 끝났을 때 호출되는 onCompletion 라는 중간 연산자가 있다
simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }
  • 아래 예제처럼 catch {} 위에 onCompletion {} 을 작성하면, throwable 이 발생했을 경우 해당 throwable 값을 받을 수 있다
fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            

 

4. Flow Cancellation checks

  • 편의를 위해 flow builder 는 방출된 각 값의 취소에 대한 추가 ensureActive 검사를 수행한다
  • flow {...} 에서 방출되는 중 루프를 취소 할 수 있음을 의미한다
fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}


/**
 * 결과
 *
 * Emitting 1
 * 1
 * Emitting 2
 * 2
 * Emitting 3
 * 3
 * Emitting 4
 * Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
 *  at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1587) 
 *  at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217) 
 *  at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215)
 */


Making busy flow cancellable

  • busy loop 일 경우 명시적으로 취소를 해주어야 한다
  • .onEach {currentCoroutineContext (). ensureActive ()} 를 추가 할 수 있지만 동일한 연산자인 cancellable() 이 제공된다
fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

댓글