I known the listen can be abort by StreamSubscription
. But for some reason, I can not call listen for the File.openRead(). How can I abort the read operation stream?
import 'dart:io';
import 'dart:async';
class Reader {
Stream<int> progess(File file) async* {
var sum = 0;
var fs = file.openRead();
await for (var d in fs) {
// consume d
sum = d.length;
yield sum;
}
}
void cancel() {
// How to abort the above loop without using StreamSubscription returned by listen().
}
}
void main() async {
var reader = Reader();
var file = File.new("a.txt");
reader.progess(file).listen((p) => print("$p"));
// How to cancel it without
Future.delayed(Duration(seconds: 1), () { reader.cancel()});
}
CodePudding user response:
You cannot cancel the stream subscription without calling cancel
on the stream subscription.
You might be able to interrupt the stream producer in some other way, using a "side channel" to ask it to stop producing values. That's not a stream cancel, more like a premature stream close.
Example:
class Reader {
bool _cancelled = false;
Stream<int> progess(File file) async* {
var sum = 0;
var fs = file.openRead();
await for (var d in fs) {
// consume d
sum = d.length;
if (_cancelled) return; // <---
yield sum;
}
}
void cancel() {
_cancelled = true;
}
}
Another option is to create a general stream wrapper which can interrupt the stream. Maybe something like
import"dart:async";
class CancelableStream<T> extends Stream<T> {
final Stream<T> _source;
final Set<_CancelableStreamSubscription<T>> _subscriptions = {};
CancelableStream(Stream<T> source) : _source = source;
@override
StreamSubscription<T> listen(
onData, {onError, onDone, cancelOnError}) {
var sub = _source.listen(onData,
one rror: one rror, onDone: onDone, cancelOnError: cancelOnError);
var canSub = _CancelableStreamSubscription<T>(sub, this, cancelOnError ?? false);
_subscriptions.add(canSub);
return canSub;
}
void cancelAll() {
while (_subscriptions.isNotEmpty) {
_subscriptions.first.cancel();
}
}
}
class _CancelableStreamSubscription<T> implements StreamSubscription<T> {
final bool _cancelOnError;
final StreamSubscription<T> _source;
final CancelableStream<T> _owner;
_CancelableStreamSubscription(
this._source, this._owner, this._cancelOnError);
@override
Future<void> cancel() {
_owner._subscriptions.remove(this);
return _source.cancel();
}
@override
void onData(f) => _source.onData(f);
@override
void one rror(f) {
if (!_cancelOnError) {
_source.onError(f);
} else {
_source.onError((Object e, StackTrace s) {
_owner._subscriptions.remove(this);
if (f is void Function(Object, StackTrace)) {
f(e, s);
} else {
f?.call(e);
}
});
}
}
@override
bool get isPaused => _source.isPaused;
@override
void onDone(f) => _source.onDone(() {
_owner._subscriptions.remove(this);
f?.call();
});
@override
void pause([resumeFuture]) => _source.pause(resumeFuture);
@override
void resume() => _source.resume;
@override
Future<E> asFuture<E>([E? value]) => _source.asFuture(value);
}
You can then use it like:
void main() async {
Stream<int> foo() async* {
yield 1;
yield 2;
yield 3;
yield 4;
}
var s = CancelableStream<int>(foo());
await for (var x in s) {
print(x);
if (x == 2) s.cancelAll();
}
}