본문 바로가기
기타개발

Coroutines - Channels

by 궝테스트 2020. 7. 30.

https://kotlinlang.org/docs/reference/coroutines/channels.html

 

Channels - Kotlin Programming Language

 

kotlinlang.org

 

1. Basics

  • Deffered value 는 코루틴 간 단일 값을 전송하고 Channel 은 값 스트림을 전송하는 방법을 제공한다
  • Channel 은 개념적으로 BlockingQueue 와 매우 유사하지만 put, take 대신 suspend sendreceive 가 있다
val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")

/*
 * 결과
 *
 * 1
 * 4
 * 9
 * 16
 * 25
 * Done!
 */

send

abstract suspend fun send(element: E): Unit (source)
  • element 를 채널로 전송하여 channel 의 버퍼가 가득 찼거나 존재하지 않는 경우 호출자를 일시 중단하거나, close 호출로 channel 이 닫힌 경우 예외가 발생한다 (isColosedForSendChannel)
  • send function 이 일시 중단 된 후 channel 을 닫아도 suspend send 는 중단되지 않는다
  • send() 가 suspend 된 후 channel 을 닫아도 send() 호출은 중단되지 않는다
  • channel 을 닫는 것은 channel 을 통해 'close token' 을 보내는 것과 같다
  • channel 을 통해 전송 된 모든 element 는 선입선출(FIFO) 방식으로 보내지며, 전송 된 element 는 close token 전에 receiver 에게 전달된다
  • send() 는 취소 가능하며, send() 가 suspended 일 때 현재 코루틴의 job 이 취소되거나 완료되면 즉시 CancellationException 로 다시 시작된다
  • send() 의 취소는 atomic 하여 CancellationException 이 발생하면 element 가 해당 channel 로 전송되지 않았음을 의미한다

receive

abstract suspend fun receive(): E (source)
  • channel 이 비어 있지 않으면 해당 channel 에서 element 를 검색해서 제거하거나 channel 이 비어있는 동안 호출자를 일시 중단 또는 channel 이 receive 를 위해 닫힌 경우 ClosedReceiveChannelException 을 발생시킨다
  • 예외로 인해 channel 이 닫힌 경우 실패한 channel 이 되며 close() 에 대한 예외를 발생시킨다
  • receive() 는 취소 가능하며, receive() 가 suspended 일 때 현재 코루틴의 job 이 취소되거나 완료되면 즉시 CancellationException 로 다시 시작된다
  • receive() 의 취소는 atomic 하여 CancellationException 이 발생하면 element 가 해당 channel 로 받아오지 않았음을 의미한다

 

2. Closing and iteration over channels

  • queue 와 달리 channel 은 더이상 element 가 없을 때 close 할 수 있다
  • close 는 channel 에 close token 을 보내며 receiver 는 이 토큰이 수신되면 for-loop 가 중지된다
  • close 하기 전에 보낸 element 는 수신이 보장된다

 

3. Building channel producers

  • 코루틴이 sequence 한 element 를 생성하는 패턴은 동시 코드에서 발견되는 생산자-소비자 패턴의 일부이다
  • producer
    • 생산자 측에서 쉽게 수행할 수 있도록하는 코루틴 빌더
    • 새로운 코루틴을 시작하여 채널로 값을 send 하여 스트림을 생성한다.
    • ReceiveChannel 을 리턴하며 receive 에 참조할 수 있다.
    • CoroutineScope 와 SendChannel 을 implements 하는 ProducerScope 인터페이스를 higher-order function 으로 사용할 수 있다.
    • SendChannel 을 구현하고 있어 바로 send 를 사용할 수 있다.
  • consumerEach : 소비자 측의 for 루프 대체하는 확장 함수
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")

// 결과
1
4
9
16
25
Done!

 

4. Pipelines

  • 사전적 의미
    • 실행중인 명령어의 실행이 끝나기 전에 다른 명령어의 실행을 시작하는 연산 방법
    • 연산 과정을 단계별로 구분하여 각 단계가 중첩되어 동시에 수행된다
  • 하나의 코루틴이 일련의 값 들(무한도 가능)의 스트림을 send 하는 동시에, 다른 코루틴(들) 이 스트림을 receive 하여 어떤 작업을 한 후 다시 send 하는 패턴
  • 아래의 예제는 따로 코루틴의 scope 를 지정하지 않아서 기본적으로 Dispatchers.Default 를 사용한다
    • Dispatchers.Default : JVM 의 공유 스레드 풀로 동작하며, 스레드 최대 수 = CPU 코어 수 (최소 2개)
    • 멀티코어 환경에서는 효율적
  • 예제 코드 2 : 무한한 일련 숫자 출력
    • 위 예제 코드의 모든 코루틴은 runBlocking scope 에서 launch 되었기 때문에, CoroutineContext 의 확장함수인 cancelChildren() 으로 해당 context 내의 모든 children 을 취소할 수 있다.
    • 위 예제 코드를 코루틴이 아닌 일반 함수로 대체하여 구현 할 수 있다.
      • produce → buildIterator
      • send → yield
      • receive → next
      • receiveChannel → Iterator
      • 위 함수로 각각 대체 후 coroutine scope 를 제거하면 runBlocking 도 필요하지 않게 됨

 

5. Fade-out

  • 여러개의 코루틴이 같은 채널로부터 receive 할 수 있다.
  • 예제 코드
    • ReceiveChannel 을 리턴하는 produce 로 cancel 가능
    • 위 예제 코드에서 consumeEach 가 아닌 for-loop 로 채널을 순회하는 이유는, 
      • for-loop : 여러개의 코루틴 중 하나가 실패하더라도 나머지 코루틴들은 연산 가능
      • consumeEach : 실패한 코루틴으로부터 취소 이벤트를 전달 받고 나머지 코루틴들도 종료된다.

 

6. Fade-in

  • 여러개의 코루틴이 같은 채널로 send 할 수 있다.
  • 예제 코드

 

7. Buffered channels

  • 위 내용의 채널은 버퍼가 없었기 때문에 send 와 receive 가 만나야했다,, (랑데뷰)
  • send 가 호출되면 receive 가 나타날 때 까지 send 는 block
  • receive 가 호출되면 send 가 나타날 때 까지 receive 는 block
  • 하지만 사실! channel 과 produce 는 capacity 라는 optional 파라미터를 전달할 수 있으며, 버퍼 사이즈를 의미한다. 
  • 버퍼 사이즈만큼 send 를 하고 block (최대 capacity 가 정해져있는 BlockingQueue 와 비슷)
  • 예제 코드

 

8. Channels are fair

  • 여러개의 코루틴에서 채널로의 send, receive 는 FIFO 방식이며 순서가 보장된다.
  • 예제 코드
  • 사용되는 Executor 특성에 따라 unfair 한 경우도 있다 : this issue

 

9. Ticker channels

  • 랑데뷰 채널
  • 마지막 receive 이후에 주어진 delay 만큼 시간이 지나면 Unit 을 채널로 send 한다.
  • 특정 시간 기반으로 produce 해야하는 Pipelines 같은 곳에 응용될 수 있다.
  • 이 채널을 만들기 위해 ticker 메소드 사용하며 더이상 필요 없을 경우 ReceiveChannel 의 cancel 호출하면 된다.
  •  TickerMode
    • FIXED_PERIOD : 기본 모드이며, 중간에 지연이 생겨도 receive 한 시간 간격과 상관없이 주어진 delay 에 따라 생성된 Unit 을 반환한다.
    • FIXED_DELAY : 중간에 지연이 생기면 그 시간이 지난 후 주어진 delay 에 따라 생성된 Unit 을 반환한다.
  • 예제 코드

'기타개발' 카테고리의 다른 글

RxJava - What's different in 3.0  (0) 2020.06.23
Docker 개념  (0) 2020.03.23

댓글