본문 바로가기
기타개발/Coroutines

Coroutines - Flow #1 : Flow builders, operators, context

by 궝테스트 2020. 7. 9.

https://kotlinlang.org/docs/reference/coroutines/flow.html

 

Asynchronous Flow - Kotlin Programming Language

 

kotlinlang.org

  • Suspend function : 비동기로 단일 값을 반환
  • Kotlin Flows : 비동기로 여러 값을 반환

1. Representing multiple values

Collection

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}

Sequence : Collection 의 결과와 같지만, 100ms 간격으로 값을 출력하며 메인 스레드 blocking 한다.

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    foo().forEach { value -> println(value) } 
}

Suspend function : 비동기 코드로 foo 함수를 suspend 지정하면, 메인 스레드를 blogcking 하지 않고 작업을 수행하고 결과를 목록으로 반환 할 수 있습니다. 값은 1초 간격으로 출력한다.

suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}

Flows : 위 예제들 처럼 리턴 타입이 List<Int> 일 경우, 모든 값을 한 번에 반환 할 수 있다. 비동기로 계산되는 값의 스트림을 나타 내기 위해 동기식으로 계산 된 값에 대해 Sequence <Int> 처럼 Flow <Int> 형식을 사용할 수 있다.

fun foo(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // 메인 스레드가 차단되었는지 확인하기 위해 launch block 으로 실행
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    foo().collect { value -> println(value) } 
}



/* 결과
 *
 * I'm not blocked 1
 * 1
 * I'm not blocked 2
 * 2
 * I'm not blocked 3
 * 3
 *
 */

 

중간에 I'm not blocked 가 찍힌걸로 보아 메인 스레드가 차단되지 않았음을 확인할 수 있다.
만약 foo() 함수의  flow block 에서 delay 를 Thread.sleep 으로 바꿀 경우, 메인 스레드가 차단되는 것을 확인할 수 있다.

  • flow { ... } : Flow 타입에 대한 builder 함수
  • flow builder block 내부 코드는 suspend 할 수 있다
  • 위 foo() 함수에 suspend 한정자가 필요없다
  • 값은 emit() 이란 함수로 내보낸다
  • 값은 collect() 란 함수로 가져온다

2. Flows are cold

  • Flow 는 Sequence 처럼 cold stream 이다
  • Flow builder 내부 코드는 collect() 가 호출되어야 실행된다
fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}


/*
 * 결과
 *
 * Calling foo...
 * Calling collect...
 * Flow started
 * 1
 * 2
 * 3
 * Calling collect again...
 * Flow started
 * 1
 * 2
 * 3
 *
 */

foo() 함수가 suspend 로 지정되지 않은 이유이고 collect() 를 또 호출하면 처음부터 값을 다시 내보낸다. (Flow started 가 찍힌걸로 확인 가능)

3. Flow cancellation

  • Flow 는 일반적인 coroutines 의 취소 로직을 따르지만 Flow 자체적으로는 취소가 없다
  • delay 같이 취소 가능한 suspend function 일 때 취소되며 그 외에는 취소되지 않는다
  • withTimeoutOrNull block 에서 flow 가 실행될 때 제한 시간 250ms 에 취소되는 예제이다
fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        foo().collect { value -> println(value) } 
    }
    println("Done")
}

/*
 * 결과
 *
 * Emitting 1
 * 1
 * Emitting 2
 * 2
 * Done
 *
 */


4. Flow builders

  • flow { ... } block 은 기본적인 flow builder 이다
  • flowOf() : 고정된 값 세트를 내보내는 flow builder
  • .asFlow() : 확장 함수로 Collection 과 Sequence 를 flow 로 변환하는 builder
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }

 

5. Intermediate flow operators

  • Flow 는 Collection 및 Sequence 와 마찬가지로 operator 로 변환할 수 있다
  • 중간 operator 는 upstream flow 에 적용되어 downstream flow 를 반환한다
    • operator 는 flow 와 마찬가지로 cold stream 이다
    • operator 는 suspend function 이 아니다
  • Sequence 와 차이점은 map, filter 같은 operator block 내에 suspend function 을 호출할 수 있다는점이다
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

suspend function 으로 구현된 long-running 동작인 경우에도 들어오는 요청의 flow 를 map operator 를 사용하여 결과를 매핑할 수 있다.

 

6. Transform operator

  • flow 변환 연산자 중 가장 일반적인 연산자가 transform 이다.
  • map 및 filter 와 같은 간단한 변환 또는 더 복잡한 변환을 구현하는 데 사용할 수 있다.
  • transform 연산자를 사용하면 임의의 횟수만큼 임의의 값을 방출 할 수 있습니다.
  • ex) transform 을 사용하면 장기 실행 비동기 요청을 수행하기 전에 문자열을 생성하고 응답을 따라갈 수 있습니다.
  • 1~3 범위중에서 하나 당 두 번 emit 하도록 작성한 코드로 순차적으로 방출하며 collect 로 값을 가져왔다.
(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }

 

7. Size-limiting operators

  • take 와 같은 Size-limiting 중간 연산자를 사용하여 해당 제한에 도달하면 flow 실행을 취소할 수 있다.
  • take : count 수 만큼 flow 를 반환 후 flow 는 취소되며, count가 양수가 아닌 경우 IllegalArgumentException이 발생한다.
  • 코루틴에서 취소는 항상 예외를 발생시켜 수행되므로 취소시 모든 리소스 관리 기능 (try {...} finally {...} 블록과 같은)이 정상적으로 작동한다.
fun numbers(): Flow<Int> = flow {
    try { // try-catch block 은 flow 내부에 위치  
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}            


/**
 * 결과
 * 1
 * 2
 * Finally in numbers
 */

 

8. Terminal flow operators

terminal 연산자는 flow collection 을 시작하는 suspend function 이다.
collect 연산자는 가장 기본적인 연산자이지만 다른 terminal 연산자가 있으므로 더 쉽게 만들 수 있다.

  • toList / toSet : flow 를 다양한 collection 으로 변환
  • first
    • flow 에서 방출 된 첫 번째 요소를 반환 후 collection 을 취소하는 terminal 연산자
    • flow 가 비어 있으면 NoSuchElementException을 발생한다
    • 만약 predicate 를 인자로 넘겨주고 있다면, predicate 와 일치하는 첫 번째 요소를 반환 후 collection 을 취소하며, predicate 와 일치하는 요소가 없으면 NoSuchElementException을 발생한다
  • reduce
    • 첫 번째 요소부터 각 요소에 연산을 적용하며 누적 후 최종 값 리턴한다
    • flow 가 비어 있으면 NoSuchElementException을 발생한다
  • fold : initial 값을 받아 각 요소에 연산을 적용하며 누적 후 최종 값 리턴한다
val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)

// 결과 : 55

 

9. Flows are sequential

  • 여러 flow 에서 작동하는 특별한 operators 를 사용하지 않으면 flow 의 각 개별 collection 이 순차적으로 수행된다
  • collection 은 terminal 연산자를 호출하는 코루틴에서 직접 수행되며 새로운 코루틴을 생성해서 수행되지 않는다
  • 각각의 방출 된 값은 모든 중간 연산자에 의해 업스트림에서 다운 스트림으로 처리 된 후 terminal 연산자에게 전달된다.
(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }   
    
/**
 * 결과
 *
 * Filter 1
 * Filter 2
 * Map 2
 * Collect string 2
 * Filter 3
 * Filter 4
 * Map 4
 * Collect string 4
 * Filter 5
 */

 

10. Flow context

  • Context Preservation : flow collection 은 항상 호출한 코루틴의 context 에서 발생한다
  • 예를 들어, foo() 의 경우 구현 세부 사항에 관계없이 foo() 를 호출한 코루틴 context 에서 실행된다
fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> { // main thread 사용중
    foo().collect { value -> log("Collected $value") } 
}        


/**
 * 결과
 *
 * [main @coroutine#1] Started foo flow
 * [main @coroutine#1] Collected 1
 * [main @coroutine#1] Collected 2
 * [main @coroutine#1] Collected 3
 */

 

Wrong emission with Context

  • CPU 를 오래 사용하는 코드는 Dispatchers.Default context 에서 실행해야 할 수 있으며, UI 업데이트 코드는 Dispatchers.Main context 에서 실행해야 한다
  • 일반적으로 withContext는 Kotlin 코루틴을 사용하여 코드의 context 를 변경하는 데 사용되지만 flow {...} builder 의 코드는 Context Preservation 특성을 준수해야하며 다른 context 에서 방출 할 수 없다
fun foo(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> println(value) } 
}       


flowOn operator

  • flowOn : flow 의 context 를 변경하는 올바른 방법
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value") 
    } 
}   


/**
 * 결과
 *
 * [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
 * [main @coroutine#1] Collected 1
 * [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
 * [main @coroutine#1] Collected 2
 * [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
 * [main @coroutine#1] Collected 3
 */
  • 메인 스레드에서 collection 이 발생하는 동안 백그라운드 스레드에서 flow {...} 이 어떻게 작동하는지 보면, collection 1 이 발생하고 동시에 다른 스레드인 백그라운드 스레드에서는 방출이 발생한다.
  • flowOn 연산자는 context 에서 CoroutineDispatcher 를 변경해야 할 때 업스트림 플로우에 대해 다른 코루틴을 생성한다.

 

11. Launching flow

  • launchIn 은 terminal operator 로 colloec() 를 대신 사용하며, 별도의 코루틴에서 flow collect 를 실행한다
  • launchIn 에 매개 변수로 flow collect 를 실행 할 CoroutineScope 를 지정한다
flow
    .onEach { value -> updateUi(value) }
    .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
    .catch { cause -> LOG.error("Exception: $cause") }
    .launchIn(uiScope)

댓글