Home > Net >  Lazily transform data in C# Stream
Lazily transform data in C# Stream

Time:09-21

I read from a stream that contains CRLF each X bytes (for me X is 2033 because the file is generated with bcp, I put X to 4 in sample code). I would like to transform the stream into another stream without this CRLF. The new stream will be deserialized as xml.

Likewise, I can do it easily and runs gracefully in this way:

using System.IO;
using System.Text;
using System.Xml.Linq;

public class CREliminator
{
    public const int BcpChunkSize = 4;
    public Stream Run(StreamReader reader)
    {
        var auxstream = new MemoryStream();
        var auxwriter = new StreamWriter(auxstream);
        var chunk = new char[BcpChunkSize];
        do
        {
            var n_bytes =
                reader
               .ReadBlock(chunk, 0, BcpChunkSize);

            auxwriter.Write(chunk[..n_bytes]);
            auxwriter.Flush();

            if (n_bytes == BcpChunkSize)
            {
                char[] chunk2 = new char[2];
                n_bytes = reader.ReadBlock(chunk2, 0, 2);
            }

        } while (!reader.EndOfStream);
        auxstream.Position = 0;
        return auxstream;
    }
}

public class UnitTest1
{
    [Fact]
    public void Test1()
    {
        var CRLF="\r\n";
        var string_data = $"<doc{CRLF}umen{CRLF}t>A<{CRLF}/doc{CRLF}umen{CRLF}t>";
        var expected = string_data.Replace(CRLF, "");
        // to stream
        var memory = new MemoryStream(Encoding.UTF8.GetBytes(string_data));
        var data = new StreamReader(memory);
        // act
        var result = new CREliminator().Run(data);
        // assert
        var x = XDocument.Load(result);
        Assert.Equal(expected, x.ToString());
    }
}

But this code loads all stream in memory before to return the new stream.

My question is, how can do it in a Lazy mode? I mean, processing stream when some process is reading from new stream.

Thanks.

CodePudding user response:

Only Stream.Read method needs to be implemented:

  • Read from source stream in chunks
  • Skip 2 bytes after every chunk
using System.Buffers;
using System.Text;

var sourceString = string.Concat(
    Enumerable.Range(1, 10).Select(_ => "Foo \r\nBar \r\nBaz!\r\n"));
Console.WriteLine("Source: "   sourceString);

var encoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
var sourceBytes = encoding.GetBytes(sourceString);

using var stream = new MemoryStream(sourceBytes);
using var filter = new CrLfFilteringStream(stream, 4);
using var reader = new StreamReader(filter, encoding);

var res = reader.ReadToEnd();
Console.WriteLine("Result: "   res);

public class CrLfFilteringStream : Stream
{
    private readonly Stream _stream;
    private readonly int _chunkSize;
    private readonly byte[] _chunk;
    private int _chunkPosition;
    private int _chunkLength;

    public CrLfFilteringStream(Stream stream, int chunkSize)
    {
        _stream = stream ?? throw new ArgumentNullException(nameof(stream));
        _chunkSize = chunkSize;
        _chunkPosition = chunkSize;
        _chunkLength = chunkSize;
        _chunk = ArrayPool<byte>.Shared.Rent(chunkSize);
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        var toRead = count;
        var bufferPosition = 0;
        Span<byte> sink = stackalloc byte[2];

        while (toRead > 0 && _chunkLength > 0)
        {
            if (_chunkPosition >= _chunkSize)
            {
                _chunkPosition = 0;
                _chunkLength = _stream.Read(_chunk, 0, _chunkSize);

                // Skip CR LF.
                _stream.Read(sink);
            }

            var currentRead = Math.Min(_chunkLength, toRead);
            Array.Copy(_chunk, _chunkPosition, buffer, bufferPosition, currentRead);
            toRead -= currentRead;
            bufferPosition  = currentRead;
            _chunkPosition  = currentRead;
        }

        return count - toRead;
    }

    public override void Flush() => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }

    protected override void Dispose(bool disposing)
    {
        ArrayPool<byte>.Shared.Return(_chunk);

        base.Dispose(disposing);
    }
}

This code rents a single chunk-sized buffer from ArrayPool and does not allocate anything else (besides CrLfFilteringStream instance).

  •  Tags:  
  • c#
  • Related