Home > Net >  How to extend Dart sockets BroadcastStream buffer size of 1024 bytes?
How to extend Dart sockets BroadcastStream buffer size of 1024 bytes?

Time:08-04

IMPORTANT EDIT

After further investigating, I found out that the packet size is in fact much larger than the stated 1024 bytes, the 1024 bytes were just the limit of the standard out I was using (android studio / flutter).

Some of the packets received are now up to ~27 000 bytes large, however that is nowhere near the actually transmitted size of over 10x that.

I am trying to send singular packets of up to 5 MB in length over a Socket connection in Dart. For this, I am using the following code:

Socket socket = await Socket.connect(globals.serverUrl, globals.serverPort);
Stream<Uint8List> stream = socket?.asBroadcastStream();
Uint8List? response = await stream?.first;
String responseString = String.fromCharCodes(response);

Note that my Server is running Java while the Client is using Dart.

After sending the data packet from the Server to the Client, it successfully receives the first 1024 bytes exactly of the packet, and the rest is nowhere to be found, even after reading stream.first multiple times, they continuously read the newly sent packet and not the remaining bytes of the old packet.

So my question is, how do I require the Socket stream to read ALL bytes of the packet until finished, and not just the first 1024?

EDIT:

The received packet on the client is parsed using:

String? decrypt(String cipherText, String keyString) {
  final key = Key.fromUtf8(keyString);
  final iv = IV.fromBase64(cipherText.split(":")[1]);
  final encrypter = Encrypter(AES(key, mode: AESMode.cbc, padding: null));
  final encrypted = Encrypted.fromBase64(cipherText.split(":")[0]);

  final decrypted = encrypter.decrypt(encrypted, iv: iv);

  globals.log.i("DECRYPTED: $decrypted");
  return decrypted;
}

The error that I am getting stems from getting the IV, since the message is cut off at 1024 bytes and the ":" appears much later in the String.

CodePudding user response:

The problem is that the Dart socket split messages bigger than 1024 bytes into multiple packets of 1024 bytes. So there's some approaches you can use to combine them together in the client:

By extending Socket class

I do not believe this is a right solution:

  • Hard to extend since it's a platform implementation (you can see the sdk implementation of dart:io almost any class method is external).
  • Hard to maintain.
  • Since it depends on custom platform implementations you need to do it on multiple platforms.
  • It's easy to create undocumented memory leaks.

Let me know if you still prefer this approach I'll do a further research.

By using Stream<T>.reduce function

The problem with this approach in your context is that Sockets do not emit a done event when a message is sent by using socket.write('Your message').

So unless you're using a socket to send a single message this function can't help you cause it will return a Future<T> that will never complete (only when the socket connection is closed).

By emitting a EOF message from the server

This is a solution I found even not so elegant, improvements are welcome.

The idea is to concatenate all client received packets into a single one and stop receiving when a pre-determined termination (EOF) string is received.

Implementation

Below is a server implementation that emits a message of 5mb followed by a message:end string every time a new client is connected.

import 'dart:io';

Future<void> main() async {
  final ServerSocket serverSocket =
      await ServerSocket.bind(InternetAddress.anyIPv4, 5050);

  final Stream<Socket> serverStream = serverSocket.asBroadcastStream();

  serverStream.listen((client) async {
    print(
        'New client connected: ${client.address}:${client.port} ${client.done} Remote address: ${client.remoteAddress}');

    const int k1byte = 8;
    const int k2bytes = k1byte * 2;
    const int k1kb = k1byte * 1000;
    const int k1mb = k1kb * 1000;
    const int k5mb = k1mb * 5;

    // Create a 5mb string that follows: '1000.....0001'
    final String k1mbMessage = '1${createStringOf(k5mb - k2bytes, '0')}1';

    client.write(k1mbMessage);
    client.write('message:end');
  });

  print('Listening on: ${serverSocket.address} ${serverSocket.port}');
}

String createStringOf(int size, [String char = ' ']) {
  // https://api.dart.dev/stable/2.17.3/dart-core/String-class.html it says:
  // > A sequence of UTF-16 code units.
  // And from https://www.ibm.com/docs/en/db2-for-zos/12?topic=unicode-utfs says:
  // > UTF-16 is based on 16-bit code units. Each character is encoded as at least 2 bytes.
  int dartStringEncodingSize = 2;

  assert(size >= dartStringEncodingSize && size.isEven,
      '''Dart char contains 2 bytes so we can only create Strings (with exact size) of even N bytes''');
  assert(char.length == 1, '''[char] must be a single char String''');

  int charCount = size ~/ dartStringEncodingSize;

  return char * charCount;
}

And here we can see a client implementation where we use 'our own reduce' function that combine all packets while the termination string is not found.

import 'dart:io';

Future<void> main() async {
  final Socket server = await Socket.connect('localhost', 5050);

  final Stream<String> serverSocket =
      server.asBroadcastStream().map(String.fromCharCodes); // Map to String by default

  const kMessageEof = 'message:end';

  String message = '';

  await for (String packet in serverSocket) {
    // If you are using [message] as a List of bytes (Uint8List):
    // message = [...Uint8List.fromList(message), ...Uint8List(packet)]
    message  = packet;

    // Do not compare directly packet == kMessageEof
    // cause it can be 'broken' into multiple packets:
    // -> 00000 (packet 1)
    // -> 00000 (packet 2)
    // -> 00mes (packet 3)
    // -> sage: (packet 4)
    // -> end   (packet 5)
    if (message.endsWith(kMessageEof)) {
      // remove termination string
      message = message.replaceRange(
        message.length - kMessageEof.length,
        message.length,
        '',
      );
    }

    print('Received: $message'); // Prints '1000000......0000001'
  }
}

You can make it more generic if you want by using an extension:

import 'dart:io';

/// This was created since the native [reduce] says:
/// > When this stream is done, the returned future is completed with the value at that time.
///
/// The problem is that socket connections does not emits the [done] event after
/// each message but after the socket disconnection.
///
/// So here is a implementation that combines [reduce] and [takeWhile].
extension ReduceWhile<T> on Stream<T> {
  Future<T> reduceWhile({
    required T Function(T previous, T element) combine,
    required bool Function(T) combineWhile,
    T? initialValue,
  }) async {
    T initial = initialValue ?? await first;

    await for (T element in this) {
      initial = combine(initial, element);
      if (!combineWhile(initial)) break;
    }

    return initial;
  }
}

Future<void> main() async {
  final Socket server = await Socket.connect('localhost', 5050);

  final Stream<String> serverSocket =
      server.asBroadcastStream().map(String.fromCharCodes);

  const kMessageEof = 'message:end';

  // Reduce with a condition [combineWhile]
  String message = await serverSocket.reduceWhile(
    combine: (previous, element) => '$previous$element',
    combineWhile: (message) => !message.endsWith(kMessageEof),
  );

  // Remove termination string
  message = message.replaceRange(
    message.length - kMessageEof.length,
    message.length,
    '',
  );

  print('Received: $message');
}

Since the socket itself doesn't send the done event the way I found to reduce all packets into a single one was by emitting 'our own done event'.

  • Related