How can I await a stream (or any other event queue) multiple times?
I tried Stream.first
& Stream.single
, both doesn't work.
What I want to do:
//next is a fake member
Future<void> xxx() async {
final eventStream = foo();
await eventStream.next; // wait for first event
//do some work
await eventStream.next; // wait for second event
//do some other differnet work
await eventStream.next; // wait for 3rd event
// another differnet work
return;
}
equvalent to:
Future<void> xxx() async {
final eventStream = foo();
int i=0;
await for (final _ in eventStream){
if(i==0);//do some work
else if(i==1);//do some other differnet work
else if(i==2){;break;}//another differnet work
i;
}
return;
}
CodePudding user response:
Try StreamQueue from package:async
.
var q = StreamQueue(eventStream);
var v1 = await q.next;
var v2 = await q.next;
var v3 = await q.next;
// other work.
await q.cancel(); // If not listened to completely yet.
CodePudding user response:
I end up with a custom class
class CompleterQueue<T> {
final _buf = <Completer<T>>[];
var _iWrite = 0;
var _iRead = 0;
int get length => _buf.length;
Future<T> next() {
if (_iRead == _buf.length) {
_buf.add(Completer<T>());
}
final fut = _buf[_iRead].future;
_iRead = 1;
_cleanup();
return fut;
}
void add(T val) {
if (_iWrite == _buf.length) {
final prm = Completer<T>();
_buf.add(prm);
}
_buf[_iWrite].complete(val);
_iWrite = 1;
_cleanup();
}
void _cleanup() {
final minI = _iWrite < _iRead ? _iWrite : _iRead;
if (minI > 0) {
_buf.removeRange(0, minI);
_iWrite -= minI;
_iRead -= minI;
}
}
}