I read from a stream that contains CRLF each X bytes (for me X is 2033 because the file is generated with bcp, I put X to 4 in sample code). I would like to transform the stream into another stream without this CRLF. The new stream will be deserialized as xml.
Likewise, I can do it easily and runs gracefully in this way:
using System.IO;
using System.Text;
using System.Xml.Linq;
public class CREliminator
{
public const int BcpChunkSize = 4;
public Stream Run(StreamReader reader)
{
var auxstream = new MemoryStream();
var auxwriter = new StreamWriter(auxstream);
var chunk = new char[BcpChunkSize];
do
{
var n_bytes =
reader
.ReadBlock(chunk, 0, BcpChunkSize);
auxwriter.Write(chunk[..n_bytes]);
auxwriter.Flush();
if (n_bytes == BcpChunkSize)
{
char[] chunk2 = new char[2];
n_bytes = reader.ReadBlock(chunk2, 0, 2);
}
} while (!reader.EndOfStream);
auxstream.Position = 0;
return auxstream;
}
}
public class UnitTest1
{
[Fact]
public void Test1()
{
var CRLF="\r\n";
var string_data = $"<doc{CRLF}umen{CRLF}t>A<{CRLF}/doc{CRLF}umen{CRLF}t>";
var expected = string_data.Replace(CRLF, "");
// to stream
var memory = new MemoryStream(Encoding.UTF8.GetBytes(string_data));
var data = new StreamReader(memory);
// act
var result = new CREliminator().Run(data);
// assert
var x = XDocument.Load(result);
Assert.Equal(expected, x.ToString());
}
}
But this code loads all stream in memory before to return the new stream.
My question is, how can do it in a Lazy mode? I mean, processing stream when some process is reading from new stream.
Thanks.
CodePudding user response:
Only Stream.Read
method needs to be implemented:
- Read from source stream in chunks
- Skip 2 bytes after every chunk
using System.Buffers;
using System.Text;
var sourceString = string.Concat(
Enumerable.Range(1, 10).Select(_ => "Foo \r\nBar \r\nBaz!\r\n"));
Console.WriteLine("Source: " sourceString);
var encoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
var sourceBytes = encoding.GetBytes(sourceString);
using var stream = new MemoryStream(sourceBytes);
using var filter = new CrLfFilteringStream(stream, 4);
using var reader = new StreamReader(filter, encoding);
var res = reader.ReadToEnd();
Console.WriteLine("Result: " res);
public class CrLfFilteringStream : Stream
{
private readonly Stream _stream;
private readonly int _chunkSize;
private readonly byte[] _chunk;
private int _chunkPosition;
private int _chunkLength;
public CrLfFilteringStream(Stream stream, int chunkSize)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_chunkSize = chunkSize;
_chunkPosition = chunkSize;
_chunkLength = chunkSize;
_chunk = ArrayPool<byte>.Shared.Rent(chunkSize);
}
public override int Read(byte[] buffer, int offset, int count)
{
var toRead = count;
var bufferPosition = 0;
Span<byte> sink = stackalloc byte[2];
while (toRead > 0 && _chunkLength > 0)
{
if (_chunkPosition >= _chunkSize)
{
_chunkPosition = 0;
_chunkLength = _stream.Read(_chunk, 0, _chunkSize);
// Skip CR LF.
_stream.Read(sink);
}
var currentRead = Math.Min(_chunkLength, toRead);
Array.Copy(_chunk, _chunkPosition, buffer, bufferPosition, currentRead);
toRead -= currentRead;
bufferPosition = currentRead;
_chunkPosition = currentRead;
}
return count - toRead;
}
public override void Flush() => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
protected override void Dispose(bool disposing)
{
ArrayPool<byte>.Shared.Return(_chunk);
base.Dispose(disposing);
}
}
This code rents a single chunk-sized buffer from ArrayPool
and does not allocate anything else (besides CrLfFilteringStream
instance).