Home > Blockchain >  How to update data in the BigQuery stream buffer?
How to update data in the BigQuery stream buffer?

Time:11-10

My Workflow

  • Data gets streamed to BigQuery from Pub/Sub using cloud function.

  • Data stays in Stream buffer for 90 minutes therefor I cannot do an Update statement.

  • I need to update the Result column before that time.

    please help.

I receive data in "Pub/Sub" then a "Cloud functions" is triggered which inserts the data inside "BigQuery"

This is the code:

const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();


exports.telemetryToBigQuery = (data, context) => {

  if (!data.data) {
    throw new Error('No telemetry data was provided!');
    return;
  }

  //Data comes in as base64
  console.log(`raw data: ${data.data}`);

  //Data gets decoded from base64 to string
  const dataDataDecode = Buffer.from(data.data, 'base64').toString();


var indexesSemicolons = [];

for (var i = 0; i < dataDataDecode.length; i  ) {
    if (dataDataDecode[i] === ";") {
        indexesSemicolons.push(i);
    }
}

if (indexesSemicolons.length == 14) {

     const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
     const model = dataDataDecode.slice(indexesSemicolons[0]   1, indexesSemicolons[1]);
     const result = dataDataDecode.slice(indexesSemicolons[1]   1, indexesSemicolons[2]);

    async function insertRowsAsStream() {
      // Inserts the JSON objects into my_dataset:my_table.

    
      const datasetId = 'put your dataset here';
      const tableId = 'put table id here';
      const rows = [
        {
          Brand: brand,
          Model: model,
          Result: result

        }
      ];

      // Insert data into a table
      await bigquery
        .dataset(datasetId)
        .table(tableId)
        .insert(rows);
      console.log(`Inserted ${rows.length} rows`);
    }
    insertRowsAsStream();
  } else {
    console.log("Invalid message");
    return;
  }
}

This data stays in the BigQuery stream buffer for about 90 minutes, But I need to need to execute an update query which changes the Result column. This is not allowed and causes an error

ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError

I need a way to update the Result before the 90 minute buffer time. Can you guys help me please.

I read the following pages online

Life of a BigQuery stream

I read the answer from the following question I think I understand the idea of what he is talking about but I don't know how to execute it.

If I am correct he is saying to stream my data to a temporary table and from there put it into a permanent table.

Stackoverflow DML Update bigQuery

CodePudding user response:

Yes, that's correct. When the data are streamed, you can't use DML. The solution is to query the data in the streaming buffer and to transform them in another table. It could be a temporary as you say and sink them in a permanent table.

You can also consider that streamed data from PubSub are raw data, and you want to keep them, and then you need te refine data in another tables. It's also a common data engineering patterns and to have different layer of filter and transformation up to the final and useful data (also named datamarts)

CodePudding user response:

To answer your question. Yes, it says that you should stream the data to a temporary table and copy it to another permanent table and in the original table you can enable an expiration time. This means that the table will be deleted after the expiration time has passed.

You can change your filters so they do not include data that could be in the current streaming buffer. If it is the case that you are using a partitioned table when you update your data you can add a WHERE clause where the timestamp has an interval of 40 to 90 minutes like:

WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).
  • Related