https://kotlinlang.org/docs/reference/coroutines/flow.html
1. Buffer
- 일반적으로 flow 는 아래 예제처럼 순차적이며 모든 연산자의 코드가 동일한 코루틴에서 실행됨을 의미한다
flowOf("A", "B", "C")
.onEach { println("1$it") }
.collect { println("2$it") }
/**
* 결과
*
* Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
*/
- 따라서 연산자 내 코드를 실행하는 데 상당한 시간이 걸리게되면 총 실행 시간은 모든 연산자 실행 시간의 만큼 늘어난다
- buffer 연산자는 지정된 용량의 channel 을 통해 flow 를 방출하는데, 적용되는 flow 에 대해 실행 중에 별도의 코루틴에서 collector 를 실행한다
flowOf("A", "B", "C")
.onEach { println("1$it") }
.buffer() // <--------------- buffer between onEach and collect
.collect { println("2$it") }
/**
* 결과
*
* P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }
*
* |
* | channel // buffer()
* V
*
* Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect
*/
- 위 예제는 두 개의 코루틴을 사용한것으로 위 코드를 호출하는 코루틴 Q는 collect 를 실행하고 buffer 이전의 코드는 Q 와 동시에 별도의 새로운 코루틴 P에서 실행된다
- 코루틴 P 사이에서 방출 된 요소를 코루틴 Q로 보내기 위해 코루틴 사이에 channel 이 사용되며, 소비자 코루틴 Q가 따라 올 때까지 생산자 코루틴 P를 일시 중단합니다 (capacity 매개 변수는이 버퍼의 크기를 정의한다)
- 다른 코루틴에서 flow 의 다른 부분을 실행하면 flow 를 collection 하는 데 걸리는 전체 시간, 특히 장기 실행 비동기 작업이 관련된 경우 전체 시간 관점에서 도움이 될 수 있다
Operator fusion
- channelFlow, flowOn, buffer, produceIn 및 broadcastIn의 인접 애플리케이션은 항상 융합되어 있으므로 올바르게 구성된 하나의 채널 만 실행된다
- 명시적으로 buffer 용량을 지정할 수 있으며 buffer() 또는 buffer(Channel.BUFFERED) 호출보다 우선한다
- 지정된 buffer 크기를 가진 여러 요청은 요청 된 buffer 크기의 합으로 burrer 를 생성한다
- buffer 의 실제 구현은 channel 을 생성하기 위해 코루틴 빌더를 사용하여 produce 를 작성하고 이를 소비하기 위해 consumeEach 에 작성한다
fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
coroutineScope { // limit the scope of concurrent producer coroutine
val channel = produce(capacity = capacity) {
collect { send(it) } // send all to channel
}
// emit all received values
channel.consumeEach { emit(it) }
}
}
2. Conflation
- flow 가 연산의 부분 결과 또는 연산 상태의 업데이트를 나타내는 경우 각 값을 처리해야 할 필요가 없는 대신 가장 최근 값은 처리해야한다
- 이 경우 collector 가 처리하기에 너무 느린 경우 conflate 연산자를 사용하여 중간 값을 건너 뛸 수 있다.
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
/**
* 결과
*
* 1
* 3
* Collected in 758 ms
*/
3. Processing the latest value
- emitter 와 collector 가 모두 느릴 때 처리 속도를 높이는 방법으로 방출 된 값을 drop 하여 수행한다
- 다른 방법은 느린 이전 collector 를 취소하고 새 값이 방출 될 때마다 다시 collector 를 시작하는 것이다.
- xxxLatest operator : 기존 동작 취소하고 다시 시작하여 최근 값 처리
'기타개발 > Coroutines' 카테고리의 다른 글
Android Networking with coroutines Flow (0) | 2020.08.06 |
---|---|
Coroutines - Flow #4 : Exceptions, completions, cancellation (0) | 2020.07.27 |
Coroutines - Flow #3 : Composing and flattening flows (0) | 2020.07.27 |
Coroutines - Flow #1 : Flow builders, operators, context (1) | 2020.07.09 |
댓글