Home > Software engineering >  Multiple ReadStream on single file
Multiple ReadStream on single file

Time:11-14

I have a large file that need to be processed. The file contains header and binary data that is split into multiple sections. The header part contains description of the binary data sections - defines the offset and length for each section.

What I was thinking is to:

  • use fs.openSync() to get the file descriptor
  • create ReadStream and read the header part (to get the binary data sections offsets). The stream is open with autoClose: false
  • close the "header" ReadStream
  • create multiple ReadStream to read the binary data sections. Each stream will have its own start and end and each stream is open with autoClose: false
  • once all is processed then close the file descriptor

I was trying to test this concept and im running into an issue when trying to open a second stream (after the header stream is closed). The error is:

Uncaught Error Error: EBADF: bad file descriptor, read

which suggests that the file is no longer open for reading (although that all streams are created with autoClose: false)

Any idea how to keep the file descriptor open until its manually closed?

(async function () {
  // open the file for reading
  const fd = fs.openSync("c:\\some\\large\\file.txt", "r");

  // initial stream that will extract the header info
  const initStream = fs.createReadStream(null, {
    fd,
    autoClose: false,
  });

  // header info data
  const headerContent = await extractHeaderContent(initStream);

  // for test purpose
  // try and extract the header again
  const testSecondStream1 = fs.createReadStream(null, {
    fd,
    autoClose: false,
  });

  const testHeaderContent = await extractHeaderContent(initStream);

  fs.closeSync(fd);
})();

// stream the data until the header data is retrieved
async function extractHeaderContent(initStream) {
  return new Promise((resolve, reject) => {
    let content = "";

    initStream.on("data", (chunk) => {
      if (chunk.indexOf("EndHeader") > -1) {
        content  = chunk.toString();
        let d = content.split("EndHeader")[0]   "EndHeader";

        // once the header info is extracted - close the stream
        initStream.close(() => resolve(d));
      }

      content  = chunk.toString();
    });
  });
}

CodePudding user response:

First off, this seems like over-optimization which is making a simple issue into a complex issue. Why not just open the file each time you want a new stream? No complication at all. Everything about opening the file in subsequent opens will be cached in the OS since you just recently opened it so it shouldn't be a performance issue.

The simple way to solve this is to just open a new file handle for each stream, then call stream.destroy() when you're done with it.

Second off, when you call initStream.close() that is going to close the file handle (you can see the close method's code here). autoClose only affects what happens when the stream gets to the end by itself, not what happens when you manually call .close().

So, one scheme you could use it to pause the stream and remove your data event handler. That will at least stop the readStream from doing any further reads. I looked into calling .destroy(), but that looks like it will also close the file handle.

And, FYI the code in your question creates testSecondStream1, but doesn't use it. It passed initStream to both calls to extractHeaderContent() which I assume was just a goof, not your intention (though it confused the heck out of me when I tried to run your code).

Here's an implementation of the complicated method that shares one file handle that works. I would not write my code this way as it's too complicated and relies on some hacks to work properly:

const fs = require('fs');

(async function () {
    // open the file for reading
    const fd = fs.openSync("c:\\some\\large\\file.txt", "r");

    // initial stream that will extract the header info
    const initStream = fs.createReadStream(null, {
        fd,
        autoClose: false,
        autoDestroy: false,
    });

    // header info data
    const headerContent = await extractHeaderContent(initStream);

    // for test purpose
    // try and extract the header again
    const testSecondStream1 = fs.createReadStream(null, {
        fd,
        autoClose: false,
        autoDestroy: false,
        start: 0,
    });

    const testHeaderContent = await extractHeaderContent(testSecondStream1);

    fs.closeSync(fd);
})().then(() => {
    console.log("finished");
}).catch(err => {
    console.log(err);
});

// stream the data until the header data is retrieved
async function extractHeaderContent(rStream) {
    return new Promise((resolve, reject) => {
        let content = "";

        function processData(chunk) {
            content  = chunk.toString();
            if (content.indexOf("EndHeader") > -1) {
                let d = content.split("EndHeader")[0]   "EndHeader";

                // once the header info is extracted
                //    stop the stream from flowing
                //    and unhook from it
                rStream.pause();
                rStream.off("data", processData);
                rStream.off("error", one rror);
                resolve(d);

                // stop the stream from trying to clean up for itself
                rStream.destroyed = true;
                rStream.fd = null;
                return;
            }
        }

        function one rror(err) {
            reject(err);
        }

        rStream.on("data", processData).on("error", one rror);
    });
}

These are the main changes I made.

  1. Add start: 0 to the second stream to tell it to start reading at the beginning again. Streams apparently don't do that automatically so if the fd was left on a different file offset from the previous read, that's where the second stream started.

  2. Add autoDestroy: false to both streams. We want to prevent all possible ways the stream will attempt to close the file handle itself.

  3. Fix the secon call to extractHeaderContent() to pass testSecondStream1, not initStream.

  4. .pause() the stream when we're done so it won't continue to read.

  5. Remove all our event handlers from the stream so it is eligible for garbage collection.

  6. Tell the stream that it is already destroyed. This is a bit of a hack, but apparently some code somewhere in the stream was trying to use the file handle after you had already called fs.closeSync(fd) which was causing an error after all your processing was done. I was not able to catch who was doing that, so telling the stream that it was already destroyed seems to bypass it.

  7. Add error event handler that rejects the promise.

  8. You're searching for "EndHeader" only in the latest chunk. If it happens to span a chunk boundary, you won't find it.


Another Approach

It occurs to me after solving this that the big complication here is the readStream and its attempt to manage the fd for you. You don't need the readStream at all. You can just use the promise interface for fileHandles and read chunks of the file yourself in a loop using await fileHandle.read(...) and then you can completely start reading over again at the start by just using the same fileHandle and you don't have to HACK into the stream to get it to stop managing the fileHandle for you.

Here's a simpler implementation that skips the streams entirely and just reads chunks yourself:

const fs = require('fs');
const fsp = fs.promises;

(async function () {

    const handle = await fsp.open("c:\\some\\large\\file.txt", "r");

    try {
        await extractheaderContent2(handle);
        await extractheaderContent2(handle);
    } finally {
        await handle.close();
    }

})().then(() => {
    console.log("finished");
}).catch(err => {
    console.log(err);
});;

async function extractheaderContent2(handle) {
    const bufSize = 4096;
    const buffer = Buffer.alloc(bufSize);
    let content = "";

    let position = 0;

    do {
        const { bytesRead } = await handle.read(buffer, 0, bufSize, position);
        position  = bytesRead;
        content  = buffer.toString();
        if (content.indexOf("EndHeader") > -1) {
            return content.split("EndHeader")[0]   "EndHeader";
        }

    } while (bytesRead === bufSize);
    // got to end of file, but didn't find EndHeader
    return null;

}
  • Related