dart.dev/tutorials/language/streams
What's the point?
- Stream 은 데이터의 비동기 시퀀스를 제공한다
- 데이터 시퀀스는 사용자 이벤트와 파일로부터 읽은 데이터가 포함된다
- Stream API 에서 await for 또는 listen() 을 사용해서 stream 을 처리할 수 있다
- 에러 대응하는 방법을 제공한다
- single subscription 과 broadcast 두 가지 종류가 있다
다트의 비동기 프로그래밍은 Future 와 Stream 클래스가 있다
- Future
: 즉시 완료되지 않는걸 나타내며 일반 함수가 결과를 리턴하는 경우,
비동기 함수는 결과를 포함한 Future 를 리턴한다.
결과값이 준비가 되면 Future 가 알려줄 것이다. - Stream
: 비동기 이벤트의 연속으로 비동기식 Iterable 과 같다.
요청 시 다음 이벤트를 가져오는 대신,
스트림은 이벤트가 준비되었을 때 이벤트가 있다는 것을 알려준다.
Receiving stream events
- 스트림은 여러가지 방법으로 만들어질 수 있지만 모두 같은 방식으로 사용될 수 있다
- 비동기 for loop (보통 await for 라고 부름) 는 for loop 와 같은 스트림의 이벤트를 Iterable 을 반복하는 것 처럼 반복한다.
ex1) int 스트림의 각 이벤트를 수신하고 더해서 합계를 Future 로 리턴하는 예제이다.
loop 문 내부가 끝나면 다음 이벤트가 도착하거나 스트림이 완료될 때까지 함수가 일시 중지 된다
// await for 를 사용하려면 async 키워드를 넣어야된다
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (var value in stream) {
sum += value;
}
return sum;
}
ex2) async* 함수를 사용해서 int 스트림을 생성해서 ex1 예제를 테스트 한다.
dartpad.dev/embed-inline.html?id=15d5ef986238c97dbc14
Error events
- 스트림은 더이상 이벤트가 없을 때 수행되며, 이벤트를 받는 코드는 새 이벤트가 도착했음을 알리는 것처럼 이벤트를 알린다
- await for 루프를 사용해서 이벤트를 읽을 때, 스트림이 완료되면! 루프는 중지된다
- 몇몇 경우에는 스트림이 완료되기 전에 에러가 발생한다
: 서버에서 파일 가져오는 동안 네트워크가 실패했거나 이벤트를 생성하는 코드가 버그가 있거나 등 - 스트림은 데이터 이벤트를 전달하는 것처럼 에러 이벤트도 전달할 수 있다
- 대부분 스트림은 첫 번째 에러가 발생하면 멈추는데,
둘 이상의 에러를 전달하는 스트림이과 에러 이벤트 이후 데이터를 전달하는 스트림이 있을 수 있다
(여기 문서에는 최대 한 개의 에러만 전달하는 스트림에 대해서만 설명한다고 한다) - await for 를 사용하여 스트림을 읽을 때 루프에서 에러가 발생하면, 루프도 종료된다
- try-catch 를 사용해서 에러를 잡을 수 있다
dartpad.dev/embed-inline.html?id=df7c1168a5c6b20fda2a76d6ff33a1da
Working with streams
- Stream 클래스에는 Iterable 처럼 스트림에서 일반적인 작업을 수행할 수 있게 돕는 메서드들이 있다
ex) lastWhere() 을 사용해서 스트림에서 마지막 양의 정수를 찾는다
Future<int> lastPositive(Stream<int> stream) =>
stream.lastWhere((x) => x >= 0);
Two kinds of Streams
Single subscription streams
- 가장 일반적인 스트림으로 이벤트 시퀀스가 포함된다
- 이벤트는 누락되지 않고 순서대로 전달되어야 한다
- 파일을 읽거나 웹 요청을 할 때 얻는 종류의 스트림이다
- 이런 스트림은 딱 한번만 들을 수 있고, 다시 시도하는건 초기 이벤트를 놓치는 것이므로 나머지 스트림은 의미가 없어진다
- 한 번 시작하면 데이터를 가져와서 청크로 제공한다
Broadcast streams
- 한 번에 하나씩 처리할 수 있다
- 언제든지 이 스트림을 들을 수 있고, 듣는 동안 시작되는 이벤트도 얻을 수 있다
- 두 개 이상의 리스너가 동시에 들을 수 있으며 이전 구독을 취소한 후 나중에 다시 들을 수도 있다
Methods that process a stream
- 스트림을 수행하고 결과를 리턴하는 메소드들 종류이다
- drain(), pipe() 를 제외한 모든 함수는 Iterable 과 비슷하다
- await for 루프와 함께 비동기 함수를 사용하면 쉽게 작성할 수 있다
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
Future<bool> contains(Object needle) async {
await for (var event in this) {
if (event == needle) return true;
}
return false;
}
Future forEach(void Function(T element) action) async {
await for (var event in this) {
action(event);
}
}
Future<List<T>> toList() async {
final result = <T>[];
await this.forEach(result.add);
return result;
}
Future<String> join([String separator = ""]) async =>
(await this.toList()).join(separator);
Methods that modify a stream
- 아래 스트림의 메서드는 원래 스트림을 기반으로 새로운 스트림을 리턴한다
- 각각 원래 스트림을 듣기 전에 누군가가 새로운 스트림을 들을 때까지 기다린다
- 아래 메소드들은 Iterable 을 다른 Iterable 로 변환한다
- 다 await for 루프랑 사용하면 비동기 함수를 쉽게 작성할 수 있다
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);
- 아래 asyncExpand(), asyncMap() 함수들은 expand(), map() 함수와 비슷한데 해당 함수 인수가 비동기 함수가 되도록 허용한다
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);
- 아래 세 가지는 await for 루프가 수행할 수 없는 에러 처리를 다룬다
Stream<T> handleError(Function onError, {bool test(error)});
Stream<T> timeout(Duration timeLimit,
{void Function(EventSink<T> sink) onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
await for (var event in streamWithoutErrors) {
yield convert(event);
}
}
Thre transform() function
- 일반적인 map 은 들어오는 각 이벤트에 대해 하나의 값이 필요하다
- 그런데 I/O 스트림의 경우, output 이벤트를 생성하는데 여러 수신 이벤트가 필요할 수 있다
Reading and decoding a file
- 아래 예제는 파일을 읽고 스트림을 통해 두 개의 transforms 을 실행한다
- UTF8 에서 데이터를 변환 한 다음 LineSplitter 로 실행한다
import 'dart:convert';
import 'dart:io';
Future<void> main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(LineSplitter());
await for (var line in lines) {
if (!line.startsWith('#')) print(line);
}
}
The listen() method
- low-level 메소드로 모든 스트림 함수는 listen() 으로 정의된다
- 새 스트림 타입을 생성하려면 Stream 클래스를 확장하고 listen() 메서드를 구현하면 된다
- 스트림의 모든 메소드들은 동작하기 위해서는 listen() 을 호출한다
- listen() 메서드는 스트림에서 수신을 시작할 수 있고,
수신 시 이벤트를 생성하는 액티브 스트림인 StreamSubscription 객체가 반환된다 - 아래와 같이 각 이벤트, 에러 이벤트, 스트림 완료 일 때 콜백을 설정할 수 있다
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError});
'기타개발 > Flutter' 카테고리의 다른 글
Bloc Library - package:bloc (0) | 2020.09.19 |
---|---|
[Android vs Flutter] View vs Widget (0) | 2020.09.18 |
Flutter 문서 링크 모음 (0) | 2020.09.13 |
Flutter : StatefulWidget & StatelessWidget (0) | 2020.09.13 |
Dart # 11 : Callable classes, Isolates, Typedefs (0) | 2020.08.28 |
댓글