본문 바로가기
기타개발/Flutter

Streams

by 궝테스트 2020. 9. 19.

dart.dev/tutorials/language/streams

 

Asynchronous programming: streams

Learn how to consume single-subscriber and broadcast streams.

dart.dev

What's the point?

  • Stream 은 데이터의 비동기 시퀀스를 제공한다
  • 데이터 시퀀스는 사용자 이벤트와 파일로부터 읽은 데이터가 포함된다
  • Stream API 에서 await for 또는 listen() 을 사용해서 stream 을 처리할 수 있다
  • 에러 대응하는 방법을 제공한다
  • single subscription 과 broadcast 두 가지 종류가 있다

 

다트의 비동기 프로그래밍은 Future 와 Stream 클래스가 있다

  • Future
    : 즉시 완료되지 않는걸 나타내며 일반 함수가 결과를 리턴하는 경우,
      비동기 함수는 결과를 포함한 Future 를 리턴한다.
      결과값이 준비가 되면 Future 가 알려줄 것이다.
  • Stream
    : 비동기 이벤트의 연속으로 비동기식 Iterable 과 같다.
      요청 시 다음 이벤트를 가져오는 대신,
      스트림은 이벤트가 준비되었을 때 이벤트가 있다는 것을 알려준다.

 

Receiving stream events

  • 스트림은 여러가지 방법으로 만들어질 수 있지만 모두 같은 방식으로 사용될 수 있다
  • 비동기 for loop (보통 await for 라고 부름) 는 for loop 와 같은 스트림의 이벤트를 Iterable 을 반복하는 것 처럼 반복한다.

ex1) int 스트림의 각 이벤트를 수신하고 더해서 합계를 Future 로 리턴하는 예제이다.
       loop 문 내부가 끝나면 다음 이벤트가 도착하거나 스트림이 완료될 때까지 함수가 일시 중지 된다

// await for 를 사용하려면 async 키워드를 넣어야된다
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

ex2) async* 함수를 사용해서 int 스트림을 생성해서 ex1 예제를 테스트 한다.
        dartpad.dev/embed-inline.html?id=15d5ef986238c97dbc14

 

Error events

  • 스트림은 더이상 이벤트가 없을 때 수행되며, 이벤트를 받는 코드는 새 이벤트가 도착했음을 알리는 것처럼 이벤트를 알린다
  • await for 루프를 사용해서 이벤트를 읽을 때, 스트림이 완료되면! 루프는 중지된다
  • 몇몇 경우에는 스트림이 완료되기 전에 에러가 발생한다
    : 서버에서 파일 가져오는 동안 네트워크가 실패했거나 이벤트를 생성하는 코드가 버그가 있거나 등
  • 스트림은 데이터 이벤트를 전달하는 것처럼 에러 이벤트도 전달할 수 있다
  • 대부분 스트림은 첫 번째 에러가 발생하면 멈추는데,
    둘 이상의 에러를 전달하는 스트림이과 에러 이벤트 이후 데이터를 전달하는 스트림이 있을 수 있다
    (여기 문서에는 최대 한 개의 에러만 전달하는 스트림에 대해서만 설명한다고 한다)
  • await for 를 사용하여 스트림을 읽을 때 루프에서 에러가 발생하면, 루프도 종료된다
  • try-catch 를 사용해서 에러를 잡을 수 있다
    dartpad.dev/embed-inline.html?id=df7c1168a5c6b20fda2a76d6ff33a1da

 

Working with streams

  • Stream 클래스에는 Iterable 처럼 스트림에서 일반적인 작업을 수행할 수 있게 돕는 메서드들이 있다
    ex) lastWhere() 을 사용해서 스트림에서 마지막 양의 정수를 찾는다
Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

 

Two kinds of Streams

Single subscription streams

  • 가장 일반적인 스트림으로 이벤트 시퀀스가 포함된다
  • 이벤트는 누락되지 않고 순서대로 전달되어야 한다
  • 파일을 읽거나 웹 요청을 할 때 얻는 종류의 스트림이다
  • 이런 스트림은 딱 한번만 들을 수 있고, 다시 시도하는건 초기 이벤트를 놓치는 것이므로 나머지 스트림은 의미가 없어진다
  • 한 번 시작하면 데이터를 가져와서 청크로 제공한다

Broadcast streams

  • 한 번에 하나씩 처리할 수 있다
  • 언제든지 이 스트림을 들을 수 있고, 듣는 동안 시작되는 이벤트도 얻을 수 있다
  • 두 개 이상의 리스너가 동시에 들을 수 있으며 이전 구독을 취소한 후 나중에 다시 들을 수도 있다

 

Methods that process a stream

  • 스트림을 수행하고 결과를 리턴하는 메소드들 종류이다
  • drain(), pipe() 를 제외한 모든 함수는 Iterable 과 비슷하다
  • await for 루프와 함께 비동기 함수를 사용하면 쉽게 작성할 수 있다
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

 

Methods that modify a stream

  • 아래 스트림의 메서드는 원래 스트림을 기반으로 새로운 스트림을 리턴한다
  • 각각 원래 스트림을 듣기 전에 누군가가 새로운 스트림을 들을 때까지 기다린다
  • 아래 메소드들은 Iterable 을 다른 Iterable 로 변환한다
  • 다 await for 루프랑 사용하면 비동기 함수를 쉽게 작성할 수 있다
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

 

  • 아래 asyncExpand(), asyncMap() 함수들은 expand(), map()  함수와 비슷한데 해당 함수 인수가 비동기 함수가 되도록 허용한다
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);

 

  • 아래 세 가지는 await for 루프가 수행할 수 없는 에러 처리를 다룬다
Stream<T> handleError(Function onError, {bool test(error)});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink) onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (var event in streamWithoutErrors) {
    yield convert(event);
  }
}


Thre transform() function

  • 일반적인 map 은 들어오는 각 이벤트에 대해 하나의 값이 필요하다
  • 그런데 I/O 스트림의 경우, output 이벤트를 생성하는데 여러 수신 이벤트가 필요할 수 있다

Reading and decoding a file

  • 아래 예제는 파일을 읽고 스트림을 통해 두 개의 transforms 을 실행한다
  • UTF8 에서 데이터를 변환 한 다음 LineSplitter 로 실행한다
import 'dart:convert';
import 'dart:io';

Future<void> main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(LineSplitter());
  await for (var line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

 

The listen() method

  • low-level 메소드로 모든 스트림 함수는 listen() 으로 정의된다
  • 새 스트림 타입을 생성하려면 Stream 클래스를 확장하고 listen() 메서드를 구현하면 된다
  • 스트림의 모든 메소드들은 동작하기 위해서는 listen() 을 호출한다
  • listen() 메서드는 스트림에서 수신을 시작할 수 있고,
    수신 시 이벤트를 생성하는 액티브 스트림인 StreamSubscription 객체가 반환된다
  • 아래와 같이 각 이벤트, 에러 이벤트, 스트림 완료 일 때 콜백을 설정할 수 있다
StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

댓글