Home > Enterprise >  Dart equivalent of BlockingCollection
Dart equivalent of BlockingCollection

Time:02-24

I'm currently migrating an App's logic code from C# to Dart and I'm looking for a similiar collection type in Dart to C#s BlockingCollection. I basically want a queue where i can iterate infinitely. If the queue is empty it just waits until a new element is added.

Is that possible in Dart?

Best

CodePudding user response:

You can use a StreamController.

Here I translated the first C# example for BlockingCollection

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class BlockingCollectionDemo
{
    static async Task Main()
    {
        await AddTakeDemo.BC_AddTakeCompleteAdding();
    }
}
class AddTakeDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.Take()
    //      BlockingCollection<T>.CompleteAdding()
    public static async Task BC_AddTakeCompleteAdding()
    {
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            // Spin up a Task to populate the BlockingCollection
            Task t1 = Task.Run(() =>
            {
                bc.Add(1);
                bc.Add(2);
                bc.Add(3);
                bc.CompleteAdding();
            });

            // Spin up a Task to consume the BlockingCollection
            Task t2 = Task.Run(() =>
            {
                try
                {
                    // Consume consume the BlockingCollection
                    while (true) Console.WriteLine(bc.Take());
                }
                catch (InvalidOperationException)
                {
                    // An InvalidOperationException means that Take() was called on a completed collection
                    Console.WriteLine("That's All!");
                }
            });

            await Task.WhenAll(t1, t2);
        }
    }
}

to dart using a StreamController instead of BlockingCollection, and Future instead of Task.

import 'dart:async';

Future<void> main() async {
  await addTakeCompleteAdding();
}

// Demonstrates:
//      StreamController<T>.add()
//      StreamController<T>.stream
//      StreamController<T>.close()
Future<void> addTakeCompleteAdding() async {
  StreamController<int> bc = StreamController<int>();

  // Spin up a Future to populate the StreamController
  Future<void> t1 = Future(() {
    bc.add(1);
    bc.add(2);
    bc.add(3);
    bc.close();
  });

  // Spin up a Future to consume the StreamController
  Future<void> t2 = Future(() async {
    // Consume consume the StreamController
    await for (final element in bc.stream) {
      print(element);
    }
    // Exits the loop when the stream is completed/closed
    print("That's All!");
  });

  await Future.wait([t1, t2]);
}

That said, the StreamController differs a bit from BlockingCollection in that it is not a queue. A Stream in dart by default, can only have one subscription, unless you create a broadcast stream. Stream is more like an async enumerable in C#.

If you really need a queue data structure you can use the async package, which has a StreamQueue class that you can use to wrap the stream from the StreamController.

Here is the above code modified to use a StreamQueue:

import 'dart:async';
import 'package:async/async.dart';

Future<void> main() async {
  await addTakeCompleteAdding();
}

// Demonstrates:
//      StreamController<T>.add()
//      StreamController<T>.stream
//      StreamController<T>.close()
//      StreamQueue<T>.next
Future<void> addTakeCompleteAdding() async {
  StreamController<int> bc = StreamController<int>();
  StreamQueue<int> queue = StreamQueue<int>(bc.stream);

  // Spin up a Future to populate the StreamController
  Future<void> t1 = Future(() {
    bc.add(1);
    bc.add(2);
    bc.add(3);
    bc.close();
  });

  // Spin up a Future to consume the StreamQueue
  Future<void> t2 = Future(() async {
    try {
      while (true) {
        // Consume consume the StreamQueue
        print(await queue.next);
      }
    } on StateError catch (e) {
      // A StateError means that next was called on a completed collection
      print("That's all!");
    }
  });

  await Future.wait([t1, t2]);
}

CodePudding user response:

You can also write your own queue, based on futures instead of a stream:

import "dart:async" show Completer;
import "dart:collection" show Queue;

abstract class BlockingQueue<T> {
  factory BlockingQueue() = _BlockingQueue;
  Future<T> removeNext();
  void add(T value);
}

class _BlockingQueue<T> implements BlockingQueue<T> {
  final Queue<T> _writes = Queue();
  final Queue<Completer<T>> _reads = Queue();
  Future<T> removeNext() {
    if (_writes.isNotEmpty) return Future.value(_writes.removeFirst());
    var completer = Completer<T>();
    _reads.add(completer);
    return completer.future;
  }
  void add(T value) {
    if (_reads.isNotEmpty) {
      _reads.removeFirst().complete(value);
    } else {
      _writes.add(value);
    }
  }
}

You can also consider a double-blocking queue, where the add method also "blocks" if there is no-one to accept the value yet. It's not even that hard,.

import "dart:async" show Completer;
import "dart:collection" show Queue;

abstract class BlockingQueue<T> {
  factory BlockingQueue() = _BlockingQueue;
  Future<T> removeNext();
  Future<void> add(T value);
}

class _BlockingQueue<T> implements BlockingQueue<T> {
  final Queue<T> _writes = Queue();
  final Queue<Completer<T>> _completers = Queue();
  Future<T> removeNext() {

    if (_writes.isNotEmpty) {
      assert(_completers.isNotEmpty);
      var completer = _completers.removeFirst();
      completer.complete(_writes.removeFirst());
      return completer.future;
    }
    var completer = Completer<T>();
    _completers.add(completer);
    return completer.future;
  }
  Future<void> add(T value) {
    if (_writes.isEmpty && _completers.isNotEmpty) {
      var completer = _completers.removeFirst();
      completer.complete(value);
      return completer.future;
    }
    var completer = Completer<T>();
    _completers.add(completer);
    _writes.add(value);
    return completer.future;
  }
}

That said, if you want to use a for (... in ...)-like loop, you probably do want to go with a Stream and use await for (... in theStream).

  •  Tags:  
  • dart
  • Related