Home > Blockchain >  Downloading large files using nodejs piped stream causes huge memory usage and OOM Error
Downloading large files using nodejs piped stream causes huge memory usage and OOM Error

Time:11-09

I am using node js to download large files(300MB) from a server and pipe the response to a file write stream. As far as I understand pipes in nodejs, the data flow is managed by node and I don't have to consider draining and other events. The issue I face is that the memory usage of the docker where my application is running increases in the same amount as the file being downloaded (i.e It seems the file is being saved in memory). This memory usage persists even when I delete the file in the docker. I am attaching the code used for creating request and piping, below for reference. The code is running fine but causing performance issues like huge memory/CPU usage and crashes with OOM error. I am not able to understand what I am doing wrong.

let req = request({
      url: firmwareURL,
      maxAttempts: 5,
      retryDelay: 5000,
      retryStrategy: request.RetryStrategies.HTTPOrNetworkError});

    // 1. Perform server request
    req.on('response', (res) => {
      console.log(methodName, 'Download response statusCode:', res.statusCode);
      if (res.statusCode === 200) {
        abortOperation = false;
        isStarted = "yes";
        // 1.1 Create local file stream if the file is found on url and WaterMark paramter, for bigger chunk
        // basepath   basefirmware folder   firmware name   file extension
        fileStoragePath = `${firmwareDirectory}/${ip}`;
              console.log("filestoragepath is",fileStoragePath);
        fileName = `${firmwareVersion}.${firmwareURL.split(".").pop()}`;
        // temporary store the file
        tempFile = `${fileStoragePath}/${fileName}`;
              console.log("tempfile is",tempFile);
        writestream = fs.createWriteStream(tempFile, {
          highWaterMark: Math.pow(2,20 )
        }); // for 1mb buffer,can be increased
        writestream.on('error', function (err) {
          // on error
          console.log('Error while creating a file write stream'   err);
          abortOperation = true;
          isStarted = "no";
          _deleteProgressPointer(ip);
        });
        // 1.2 Get content length of the current response
        size = parseInt(res.headers['content-length'], 10);
        console.log(methodName, 'File size is:', size);
        req.pipe(writestream);
      } else {
        // 1.3 Ignore next request events on failure
        console.log(methodName, 'File not found on server. res.statusCode:', res.statusCode);
        abortOperation = true;
        isStarted = "no";
        _deleteProgressPointer(ip);
      }
    });
    // 3. In case of error ignore next request events
    req.on('error', (error) => {
      console.log(methodName, 'File not found on server:', error);
      abortOperation = true;
      isStarted = "no";
      _deleteProgressPointer(ip);
    });
    // 4. After stream is received close the connection
    req.on('end', () => {
      if (!abortOperation) {
        if (null !== writestream) {
          writestream.end();
          writestream.on('finish', function () {
            console.log(methodName, `File successfully downloaded for device ${ip} of firmware version ${firmwareVersion}`);
            try {

              // file extraction/storage operation
              // further check whether the file extension is valid or not
              if (ALLOWED_EXTENSION.includes(firmwareURL.split(".").pop())) {
                try {
                  //req.unpipe(writestream);
                  fileio.removeFile(tempFile); //deleting downloaded file to avoid storage issues
                  });
                  console.log("upgrade ended");
                  return upgradeOp;
                } catch (error) {
                  console.log(`Error while renamining file: ${tempFile}`);
                }
              } else {
                console.log(methodName, ` Not an valid file extension: ${tempFile}`);
                fileio.removeFile(tempFile);
                console.log(methodName, ` Invalid: ${tempFile} removed`);
              }
              // delete the progress pointer
              _deleteProgressPointer(ip);

            } catch (error) {
              // delete the progress pointer
              _deleteProgressPointer(ip);
              console.log(methodName, `Error during read/write operation :${error}`);
            }
          });
        }


CodePudding user response:

The problem is that you are using the requestretry package, which does not really support streaming. It does always call request with a callback and will provide a promise that is resolved with the full response. The request library will read the entire response body when such a callback is provided, which indeed does buffer the complete response in memory. This is not what you want.

I don't see a way to do streaming-only with requestretry, so you should use the request package directly (or, given its deprecation, one of its successor libraries) and handle the retry logic yourself.

  • Related