Home > database >  How do I trim zeros at the end of a custom ReadOnlySequence implementation?
How do I trim zeros at the end of a custom ReadOnlySequence implementation?

Time:04-21

The web socket server expects JSON strings. _webSocket.SendAsync sends ReadOnlyMemory<byte>. The issue is that sequence.First has trailing zeros. It results in an invalid JSON message because of the trailing zeros. The question is how do I trim them?

Usage

var request = new JsonRpcRequest<object>
{
    JsonRpc = "2.0",
    Id = 1,
    Method = "public/get_instruments",
    Params = @params
};

var message = JsonSerializer.Serialize(request);

using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);

var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);

var msg = new ChannelWebSocket.Message
{
    MessageType = WebSocketMessageType.Text,
    Payload = seq
};

await client.Output.WriteAsync(msg).ConfigureAwait(false);

Code

private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
    await foreach (var message in _output.Reader.ReadAllAsync())
    {
        var sequence = message.Payload.ReadOnlySequence;
        if (sequence.IsEmpty)
            continue;

        while (!sequence.IsSingleSegment)
        {
            await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
            sequence = sequence.Slice(sequence.First.Length);
        }

        await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
        message.Payload.Dispose();
    }

    await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}

public sealed class Message
{
    public WebSocketMessageType MessageType { get; set; }
    public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}

public sealed class OwnedMemorySequence<T> : IDisposable
{
    private readonly CollectionDisposable _disposable = new();
    private readonly MemorySequence<T> _sequence = new();

    public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;

    public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
    {
        _disposable.Add(memoryOwner);
        _sequence.Append(memoryOwner.Memory);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }
}

public static class MemoryOwnerSliceExtensions
{
    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
    {
        if (start == 0 && length == owner.Memory.Length)
            return owner;
        return new SliceOwner<T>(owner, start, length);
    }

    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
    {
        if (start == 0)
            return owner;
        return new SliceOwner<T>(owner, start);
    }

    private sealed class SliceOwner<T> : IMemoryOwner<T>
    {
        private readonly IMemoryOwner<T> _owner;

        public SliceOwner(IMemoryOwner<T> owner, int start, int length)
        {
            _owner = owner;
            Memory = _owner.Memory.Slice(start, length);
        }

        public SliceOwner(IMemoryOwner<T> owner, int start)
        {
            _owner = owner;
            Memory = _owner.Memory[start..];
        }

        public Memory<T> Memory { get; }

        public void Dispose()
        {
            _owner.Dispose();
        }
    }
}

public sealed class MemorySequence<T>
{
    private MemorySegment? _head;
    private MemorySegment? _tail;

    public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);

    public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
    {
        if (_tail == null)
            _head = _tail = new MemorySegment(buffer, 0);
        else
            _tail = _tail.Append(buffer);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
    }

    private sealed class MemorySegment : ReadOnlySequenceSegment<T>
    {
        public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
        {
            Memory = memory;
            RunningIndex = runningIndex;
        }

        public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
        {
            var next = new MemorySegment(nextMemory, RunningIndex   Memory.Length);
            Next = next;
            return next;
        }
    }
}

CodePudding user response:

You don't need to trim. You just need to slice your memory accordingly.

Pay attention to MemoryPool<T>.Rent (quote from the doc) returning "[...] a memory block capable capable of holding at least minBufferSize elements of T." The important bit here is "at least", meaning the returned memory is allowed to be larger than the requested size.

All you need to do is to create a slice of the requested size from the memory if it happens to be larger than the requested size before adding it to OwnedMemorySequence<T>_sequence member .

  • Related