https://kotlinlang.org/docs/reference/coroutines/flow.html
1. Composing multiple flows
Zip
- 두 flow 값을 결합하며 둘 중 작은 개수에 맞춰 결과 개수도 리턴된다
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
/**
* 결과
*
* 1 -> one
* 2 -> two
* 3 -> three
*/
- 각 flow 에서 가장 최근에 방출된 값들을 결합하여 생성된 flow 를 리턴한다
- flow 의 방출 타이밍에 따라 결합된 결과 개수가 달라져 flow 의 초기 개수와 맞지 않는다
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it)
}
/**
* 결과
*
* "1a 2a 2b 2c"
*/
2. Flattening flows
- Flow 는 비동기식으로 받은 시퀀스를 나타내므로 각 값이 다른 값 시퀀스에 대한 요청을 트리거하는 상황을 만날 수 있다
- 예를 들어, 500ms 간격으로 두 문자열의 흐름을 리턴하는 함수가 있고, 세 개의 Int 타입의 flow 가 각각 requestFlow를 호출한다
- map 의 반환값은 Flow<Flow<String>> 이 되며 외부 Flow 만 처리하고 내부 Flow 는 처리하지 못하게된다
- 이런 경우 single flow 로 flatten 처리가 필요하며 Collection 이나 Sequence 에도 flatten 과 flatMap operator 가 있다
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
(1..3).asFlow().map { requestFlow(it) }
flatMapConcat
- 내부 flow 가 완료 될 때까지 기다렸다가 다음 flow 를 collect 한다 (순서 보장O)
- 1 ~3 까지 방출되면서 requestFlow 를 통해 또 flow 를 방출하기때문에 Flow<Flow<String>> 타입이며 flatMapConcat 이 Flow<String> 타입으로 만들어주어 collect 에서 출력된다
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
/**
* 결과
*
* 1: First at 128 ms from start
* 1: Second at 629 ms from start
* 2: First at 729 ms from start
* 2: Second at 1229 ms from start
* 3: First at 1330 ms from start
* 3: Second at 1830 ms from start
*/
flatMapMerge
- 모든 flow 를 동시에 collect 하고 값을 single flow 로 merge 하여 가능한 빨리 값을 방출한다 (순서 보장X)
- flatMapMerge 및 flattenMerge 연산자로 구현되며, 둘 다 동시에 collect 될 수 있는 flow 수를 제한하는 concurrency 파라미터를 넘길 수 있다 (기본값은 DEFAULT_CONCURRENCY)
- flatMapMerge 는 requestFlow (it) 을 순차적으로 호출하지만 결과값은 동시에 수집하므로 순차적인 map { requestFlow (it) } 을 먼저 수행 한 다음 flattenMerge 를 호출하는 것과 같다
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
/**
* 결과
*
* 1: First at 154 ms from start
* 2: First at 247 ms from start
* 3: First at 348 ms from start
* 1: Second at 651 ms from start
* 2: Second at 747 ms from start
* 3: Second at 849 ms from start
*/
flatMapLatest
- collectLatest 연산자와 유사한 방식으로, 새 flow 가 방출되는 즉시 이전 flow 의 collection 을 취소한다
- requestFlow(it) 부분 자체가 새 값으로 취소되며 try-catch 처리로 cancel 에 대한 처리를 할 수 있다
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
/**
* 결과
*
* 1: First at 149 ms from start
* 2: First at 283 ms from start
* 3: First at 384 ms from start
* 3: Second at 884 ms from start
*/
'기타개발 > Coroutines' 카테고리의 다른 글
Android Networking with coroutines Flow (0) | 2020.08.06 |
---|---|
Coroutines - Flow #4 : Exceptions, completions, cancellation (0) | 2020.07.27 |
Coroutines - Flow #2 : Buffering (0) | 2020.07.21 |
Coroutines - Flow #1 : Flow builders, operators, context (1) | 2020.07.09 |
댓글