본문 바로가기
기타개발/Coroutines

Coroutines - Flow #2 : Buffering

by 궝테스트 2020. 7. 21.

https://kotlinlang.org/docs/reference/coroutines/flow.html

 

Asynchronous Flow - Kotlin Programming Language

 

kotlinlang.org

 

1. Buffer

  • 일반적으로 flow 는 아래 예제처럼 순차적이며 모든 연산자의 코드가 동일한 코루틴에서 실행됨을 의미한다
flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }
    

/**
 * 결과
 *
 * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
 */
  • 따라서 연산자 내 코드를 실행하는 데 상당한 시간이 걸리게되면 총 실행 시간은 모든 연산자 실행 시간의 만큼 늘어난다
  • buffer 연산자는 지정된 용량의 channel 을 통해 flow 를 방출하는데, 적용되는 flow 에 대해 실행 중에 별도의 코루틴에서 collector 를 실행한다
flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .buffer()  // <--------------- buffer between onEach and collect
    .collect { println("2$it") }
    
    
/**
 * 결과
 *
 * P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }
 * 
 *                       |
 *                       | channel               // buffer()
 *                       V
 * 
 * Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect
 */
  • 위 예제는 두 개의 코루틴을 사용한것으로 위 코드를 호출하는 코루틴 Q는 collect 를 실행하고 buffer 이전의 코드는 Q 와 동시에 별도의 새로운 코루틴 P에서 실행된다
  • 코루틴 P 사이에서 방출 된 요소를 코루틴 Q로 보내기 위해 코루틴 사이에 channel 이 사용되며, 소비자 코루틴 Q가 따라 올 때까지 생산자 코루틴 P를 일시 중단합니다 (capacity 매개 변수는이 버퍼의 크기를 정의한다)
  • 다른 코루틴에서 flow 의 다른 부분을 실행하면 flow 를 collection 하는 데 걸리는 전체 시간, 특히 장기 실행 비동기 작업이 관련된 경우 전체 시간 관점에서 도움이 될 수 있다

Operator fusion

  • channelFlow, flowOn, buffer, produceIn 및 broadcastIn의 인접 애플리케이션은 항상 융합되어 있으므로 올바르게 구성된 하나의 채널 만 실행된다
  • 명시적으로 buffer 용량을 지정할 수 있으며 buffer() 또는 buffer(Channel.BUFFERED) 호출보다 우선한다
  • 지정된 buffer 크기를 가진 여러 요청은 요청 된 buffer 크기의 합으로 burrer 를 생성한다
  • buffer 의 실제 구현은 channel 을 생성하기 위해 코루틴 빌더를 사용하여 produce 를 작성하고 이를 소비하기 위해 consumeEach 에 작성한다
fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
    coroutineScope { // limit the scope of concurrent producer coroutine
        val channel = produce(capacity = capacity) {
            collect { send(it) } // send all to channel
        }
        // emit all received values
        channel.consumeEach { emit(it) }
    }
}


2. Conflation

  • flow 가 연산의 부분 결과 또는 연산 상태의 업데이트를 나타내는 경우 각 값을 처리해야 할 필요가 없는 대신 가장 최근 값은 처리해야한다
  • 이 경우 collector 가 처리하기에 너무 느린 경우 conflate 연산자를 사용하여 중간 값을 건너 뛸 수 있다.
val time = measureTimeMillis {
    foo()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")


/**
 * 결과
 *
 * 1
 * 3
 * Collected in 758 ms
 */


3. Processing the latest value

  • emitter 와 collector 가 모두 느릴 때 처리 속도를 높이는 방법으로 방출 된 값을 drop 하여 수행한다
  • 다른 방법은 느린 이전 collector 를 취소하고 새 값이 방출 될 때마다 다시 collector 를 시작하는 것이다.
  • xxxLatest operator : 기존 동작 취소하고 다시 시작하여 최근 값 처리

댓글