본문 바로가기
기타개발/Coroutines

Coroutines - Flow #3 : Composing and flattening flows

by 궝테스트 2020. 7. 27.

https://kotlinlang.org/docs/reference/coroutines/flow.html

 

Asynchronous Flow - Kotlin Programming Language

 

kotlinlang.org

 

1. Composing multiple flows 

Zip

  • 두 flow 값을 결합하며 둘 중 작은 개수에 맞춰 결과 개수도 리턴된다
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print
    
/**
 * 결과
 * 
 * 1 -> one
 * 2 -> two
 * 3 -> three
 */

Combine

  • 각 flow 에서 가장 최근에 방출된 값들을 결합하여 생성된 flow 를 리턴한다
  • flow 의 방출 타이밍에 따라 결합된 결과 개수가 달라져 flow 의 초기 개수와 맞지 않는다
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
    println(it)
}

/**
 * 결과
 * 
 *  "1a 2a 2b 2c"
 */

 

2. Flattening flows

  • Flow 는 비동기식으로 받은 시퀀스를 나타내므로 각 값이 다른 값 시퀀스에 대한 요청을 트리거하는 상황을 만날 수 있다
  • 예를 들어, 500ms 간격으로 두 문자열의 흐름을 리턴하는 함수가 있고, 세 개의 Int 타입의 flow 가 각각 requestFlow를 호출한다
  • map 의 반환값은 Flow<Flow<String>> 이 되며 외부 Flow 만 처리하고 내부 Flow 는 처리하지 못하게된다
  • 이런 경우 single flow 로 flatten 처리가 필요하며 Collection 이나 Sequence 에도 flatten 과 flatMap operator 가 있다
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
(1..3).asFlow().map { requestFlow(it) }


flatMapConcat

  • 내부 flow 가 완료 될 때까지 기다렸다가 다음 flow 를 collect 한다 (순서 보장O)
  • 1 ~3 까지 방출되면서 requestFlow 를 통해 또 flow 를 방출하기때문에 Flow<Flow<String>> 타입이며 flatMapConcat 이 Flow<String> 타입으로 만들어주어 collect 에서 출력된다
val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapConcat { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
    
    
/**
 * 결과
 *
 * 1: First at 128 ms from start
 * 1: Second at 629 ms from start
 * 2: First at 729 ms from start
 * 2: Second at 1229 ms from start
 * 3: First at 1330 ms from start
 * 3: Second at 1830 ms from start
 */


flatMapMerge

  • 모든 flow 를 동시에 collect 하고 값을 single flow 로 merge 하여 가능한 빨리 값을 방출한다 (순서 보장X)
  • flatMapMerge 및 flattenMerge 연산자로 구현되며, 둘 다 동시에 collect 될 수 있는 flow 수를 제한하는 concurrency 파라미터를 넘길 수 있다 (기본값은 DEFAULT_CONCURRENCY)
  • flatMapMerge 는 requestFlow (it) 을 순차적으로 호출하지만 결과값은 동시에 수집하므로 순차적인 map { requestFlow (it) } 을 먼저 수행 한 다음 flattenMerge 를 호출하는 것과 같다
val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
    
/**
 * 결과
 *
 * 1: First at 154 ms from start
 * 2: First at 247 ms from start
 * 3: First at 348 ms from start
 * 1: Second at 651 ms from start
 * 2: Second at 747 ms from start
 * 3: Second at 849 ms from start
 */


flatMapLatest

  • collectLatest 연산자와 유사한 방식으로, 새 flow 가 방출되는 즉시 이전 flow 의 collection 을 취소한다
  • requestFlow(it) 부분 자체가 새 값으로 취소되며 try-catch 처리로 cancel 에 대한 처리를 할 수 있다
val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
    
/**
 * 결과
 *
 * 1: First at 149 ms from start
 * 2: First at 283 ms from start
 * 3: First at 384 ms from start
 * 3: Second at 884 ms from start
 */

댓글