Home > Net >  How to break the loop for a stream in dart?
How to break the loop for a stream in dart?

Time:03-03

I known the listen can be abort by StreamSubscription. But for some reason, I can not call listen for the File.openRead(). How can I abort the read operation stream?

import 'dart:io';
import 'dart:async';

class Reader {
  Stream<int> progess(File file) async* {
    var sum = 0;
    var fs = file.openRead();
    await for (var d in fs) {
      // consume d
      sum  = d.length;
      yield sum;
    }
  }

  void cancel() {
    // How to abort the above loop without using StreamSubscription returned by listen().
  }
}


void main() async {
  var reader = Reader();
  var file = File.new("a.txt");
  reader.progess(file).listen((p) => print("$p"));
  // How to cancel it without 
  Future.delayed(Duration(seconds: 1), () { reader.cancel()});
}

CodePudding user response:

You cannot cancel the stream subscription without calling cancel on the stream subscription.

You might be able to interrupt the stream producer in some other way, using a "side channel" to ask it to stop producing values. That's not a stream cancel, more like a premature stream close.

Example:

class Reader {
  bool _cancelled = false;
  Stream<int> progess(File file) async* {
    var sum = 0;
    var fs = file.openRead();
    await for (var d in fs) {
      // consume d
      sum  = d.length;
      if (_cancelled) return; // <---
      yield sum;
    }
  }

  void cancel() {
    _cancelled = true;
  }
}

Another option is to create a general stream wrapper which can interrupt the stream. Maybe something like

import"dart:async";
class CancelableStream<T> extends Stream<T> {
  final Stream<T> _source;
  final Set<_CancelableStreamSubscription<T>> _subscriptions = {};
  CancelableStream(Stream<T> source) : _source = source;
  @override
  StreamSubscription<T> listen(
       onData, {onError, onDone, cancelOnError}) {
    var sub = _source.listen(onData, 
        one rror: one rror, onDone: onDone, cancelOnError: cancelOnError);
    var canSub = _CancelableStreamSubscription<T>(sub, this, cancelOnError ?? false);
    _subscriptions.add(canSub);
    return canSub;
  }
  void cancelAll() {
    while (_subscriptions.isNotEmpty) {
      _subscriptions.first.cancel();
    }
  }
}

class _CancelableStreamSubscription<T> implements StreamSubscription<T> {
  final bool _cancelOnError;
  final StreamSubscription<T> _source;
  final CancelableStream<T> _owner;
  _CancelableStreamSubscription(
      this._source, this._owner, this._cancelOnError);
  @override
  Future<void> cancel() {
    _owner._subscriptions.remove(this);
    return _source.cancel();
  }
  @override
  void onData(f) => _source.onData(f);
  @override
  void one rror(f) {
    if (!_cancelOnError) {
      _source.onError(f);
    } else {
      _source.onError((Object e, StackTrace s) {
        _owner._subscriptions.remove(this);
        if (f is void Function(Object, StackTrace)) {
          f(e, s);
        } else {
          f?.call(e);
        } 
      });
    }
  }
  @override
  bool get isPaused => _source.isPaused;
  @override
  void onDone(f) => _source.onDone(() {
     _owner._subscriptions.remove(this);
     f?.call();
  });
  @override
  void pause([resumeFuture]) => _source.pause(resumeFuture);
  @override
  void resume() => _source.resume;
  @override
  Future<E> asFuture<E>([E? value]) => _source.asFuture(value);
}

You can then use it like:

void main() async {
  Stream<int> foo() async* {
    yield 1;
    yield 2;
    yield 3;
    yield 4;
  }
  var s = CancelableStream<int>(foo());
  await for (var x in s) {
    print(x);
    if (x == 2) s.cancelAll();
  }
}
  • Related