https://kotlinlang.org/docs/reference/coroutines/flow.html
- Suspend function : 비동기로 단일 값을 반환
- Kotlin Flows : 비동기로 여러 값을 반환
1. Representing multiple values
Collection
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
Sequence : Collection 의 결과와 같지만, 100ms 간격으로 값을 출력하며 메인 스레드 blocking 한다.
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
Suspend function : 비동기 코드로 foo 함수를 suspend 지정하면, 메인 스레드를 blogcking 하지 않고 작업을 수행하고 결과를 목록으로 반환 할 수 있습니다. 값은 1초 간격으로 출력한다.
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
Flows : 위 예제들 처럼 리턴 타입이 List<Int> 일 경우, 모든 값을 한 번에 반환 할 수 있다. 비동기로 계산되는 값의 스트림을 나타 내기 위해 동기식으로 계산 된 값에 대해 Sequence <Int> 처럼 Flow <Int> 형식을 사용할 수 있다.
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// 메인 스레드가 차단되었는지 확인하기 위해 launch block 으로 실행
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
/* 결과
*
* I'm not blocked 1
* 1
* I'm not blocked 2
* 2
* I'm not blocked 3
* 3
*
*/
중간에 I'm not blocked 가 찍힌걸로 보아 메인 스레드가 차단되지 않았음을 확인할 수 있다.
만약 foo() 함수의 flow block 에서 delay 를 Thread.sleep 으로 바꿀 경우, 메인 스레드가 차단되는 것을 확인할 수 있다.
- flow { ... } : Flow 타입에 대한 builder 함수
- flow builder block 내부 코드는 suspend 할 수 있다
- 위 foo() 함수에 suspend 한정자가 필요없다
- 값은 emit() 이란 함수로 내보낸다
- 값은 collect() 란 함수로 가져온다
2. Flows are cold
- Flow 는 Sequence 처럼 cold stream 이다
- Flow builder 내부 코드는 collect() 가 호출되어야 실행된다
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
/*
* 결과
*
* Calling foo...
* Calling collect...
* Flow started
* 1
* 2
* 3
* Calling collect again...
* Flow started
* 1
* 2
* 3
*
*/
foo() 함수가 suspend 로 지정되지 않은 이유이고 collect() 를 또 호출하면 처음부터 값을 다시 내보낸다. (Flow started 가 찍힌걸로 확인 가능)
3. Flow cancellation
- Flow 는 일반적인 coroutines 의 취소 로직을 따르지만 Flow 자체적으로는 취소가 없다
- delay 같이 취소 가능한 suspend function 일 때 취소되며 그 외에는 취소되지 않는다
- withTimeoutOrNull block 에서 flow 가 실행될 때 제한 시간 250ms 에 취소되는 예제이다
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
/*
* 결과
*
* Emitting 1
* 1
* Emitting 2
* 2
* Done
*
*/
4. Flow builders
- flow { ... } block 은 기본적인 flow builder 이다
- flowOf() : 고정된 값 세트를 내보내는 flow builder
- .asFlow() : 확장 함수로 Collection 과 Sequence 를 flow 로 변환하는 builder
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
5. Intermediate flow operators
- Flow 는 Collection 및 Sequence 와 마찬가지로 operator 로 변환할 수 있다
- 중간 operator 는 upstream flow 에 적용되어 downstream flow 를 반환한다
- operator 는 flow 와 마찬가지로 cold stream 이다
- operator 는 suspend function 이 아니다
- Sequence 와 차이점은 map, filter 같은 operator block 내에 suspend function 을 호출할 수 있다는점이다
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
suspend function 으로 구현된 long-running 동작인 경우에도 들어오는 요청의 flow 를 map operator 를 사용하여 결과를 매핑할 수 있다.
6. Transform operator
- flow 변환 연산자 중 가장 일반적인 연산자가 transform 이다.
- map 및 filter 와 같은 간단한 변환 또는 더 복잡한 변환을 구현하는 데 사용할 수 있다.
- transform 연산자를 사용하면 임의의 횟수만큼 임의의 값을 방출 할 수 있습니다.
- ex) transform 을 사용하면 장기 실행 비동기 요청을 수행하기 전에 문자열을 생성하고 응답을 따라갈 수 있습니다.
- 1~3 범위중에서 하나 당 두 번 emit 하도록 작성한 코드로 순차적으로 방출하며 collect 로 값을 가져왔다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
7. Size-limiting operators
- take 와 같은 Size-limiting 중간 연산자를 사용하여 해당 제한에 도달하면 flow 실행을 취소할 수 있다.
- take : count 수 만큼 flow 를 반환 후 flow 는 취소되며, count가 양수가 아닌 경우 IllegalArgumentException이 발생한다.
- 코루틴에서 취소는 항상 예외를 발생시켜 수행되므로 취소시 모든 리소스 관리 기능 (try {...} finally {...} 블록과 같은)이 정상적으로 작동한다.
fun numbers(): Flow<Int> = flow {
try { // try-catch block 은 flow 내부에 위치
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
/**
* 결과
* 1
* 2
* Finally in numbers
*/
8. Terminal flow operators
terminal 연산자는 flow collection 을 시작하는 suspend function 이다.
collect 연산자는 가장 기본적인 연산자이지만 다른 terminal 연산자가 있으므로 더 쉽게 만들 수 있다.
- toList / toSet : flow 를 다양한 collection 으로 변환
- first
- flow 에서 방출 된 첫 번째 요소를 반환 후 collection 을 취소하는 terminal 연산자
- flow 가 비어 있으면 NoSuchElementException을 발생한다
- 만약 predicate 를 인자로 넘겨주고 있다면, predicate 와 일치하는 첫 번째 요소를 반환 후 collection 을 취소하며, predicate 와 일치하는 요소가 없으면 NoSuchElementException을 발생한다
- reduce
- 첫 번째 요소부터 각 요소에 연산을 적용하며 누적 후 최종 값 리턴한다
- flow 가 비어 있으면 NoSuchElementException을 발생한다
- fold : initial 값을 받아 각 요소에 연산을 적용하며 누적 후 최종 값 리턴한다
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
// 결과 : 55
9. Flows are sequential
- 여러 flow 에서 작동하는 특별한 operators 를 사용하지 않으면 flow 의 각 개별 collection 이 순차적으로 수행된다
- collection 은 terminal 연산자를 호출하는 코루틴에서 직접 수행되며 새로운 코루틴을 생성해서 수행되지 않는다
- 각각의 방출 된 값은 모든 중간 연산자에 의해 업스트림에서 다운 스트림으로 처리 된 후 terminal 연산자에게 전달된다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
/**
* 결과
*
* Filter 1
* Filter 2
* Map 2
* Collect string 2
* Filter 3
* Filter 4
* Map 4
* Collect string 4
* Filter 5
*/
10. Flow context
- Context Preservation : flow collection 은 항상 호출한 코루틴의 context 에서 발생한다
- 예를 들어, foo() 의 경우 구현 세부 사항에 관계없이 foo() 를 호출한 코루틴 context 에서 실행된다
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> { // main thread 사용중
foo().collect { value -> log("Collected $value") }
}
/**
* 결과
*
* [main @coroutine#1] Started foo flow
* [main @coroutine#1] Collected 1
* [main @coroutine#1] Collected 2
* [main @coroutine#1] Collected 3
*/
Wrong emission with Context
- CPU 를 오래 사용하는 코드는 Dispatchers.Default context 에서 실행해야 할 수 있으며, UI 업데이트 코드는 Dispatchers.Main context 에서 실행해야 한다
- 일반적으로 withContext는 Kotlin 코루틴을 사용하여 코드의 context 를 변경하는 데 사용되지만 flow {...} builder 의 코드는 Context Preservation 특성을 준수해야하며 다른 context 에서 방출 할 수 없다
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
flowOn operator
- flowOn : flow 의 context 를 변경하는 올바른 방법
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
/**
* 결과
*
* [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
* [main @coroutine#1] Collected 1
* [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
* [main @coroutine#1] Collected 2
* [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
* [main @coroutine#1] Collected 3
*/
- 메인 스레드에서 collection 이 발생하는 동안 백그라운드 스레드에서 flow {...} 이 어떻게 작동하는지 보면, collection 1 이 발생하고 동시에 다른 스레드인 백그라운드 스레드에서는 방출이 발생한다.
- flowOn 연산자는 context 에서 CoroutineDispatcher 를 변경해야 할 때 업스트림 플로우에 대해 다른 코루틴을 생성한다.
11. Launching flow
- launchIn 은 terminal operator 로 colloec() 를 대신 사용하며, 별도의 코루틴에서 flow collect 를 실행한다
- launchIn 에 매개 변수로 flow collect 를 실행 할 CoroutineScope 를 지정한다
flow
.onEach { value -> updateUi(value) }
.onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
.catch { cause -> LOG.error("Exception: $cause") }
.launchIn(uiScope)
'기타개발 > Coroutines' 카테고리의 다른 글
Android Networking with coroutines Flow (0) | 2020.08.06 |
---|---|
Coroutines - Flow #4 : Exceptions, completions, cancellation (0) | 2020.07.27 |
Coroutines - Flow #3 : Composing and flattening flows (0) | 2020.07.27 |
Coroutines - Flow #2 : Buffering (0) | 2020.07.21 |
댓글