I have a MySQL table with millions of data. For each row I have to apply a custom logic and update the modified data on another table.
Using knex.js I run the query to read the data using the stream() function
Once I get the Stream object I apply my logic to the data event. Everything works correctly but at a certain point it stops without giving any errors.
I tried to pause the stream before each update operation in the new table and restart it after completing the update but the problem is not solved. Trying to put a limit on the query, for example to 1000 results, the system works fine.
Sample code:
const readableStream = knex.select('*')
.from('big_table')
.stream();
readableStream.on('data', async(data) => {
readableStream.pause() // pause stream
const toUpdate = applyLogic(data) // sync func
const whereCond = getWhereCondition(data) // sync func
try {
await knex('to_update').where(whereCond).update(toUpdate)
console.log('UPDATED')
readableStream.resume() // resume stream
} catch (e) {
console.log('ERROR', e)
}
readableStream.resume() // resume stream
}).on('finish', () => {
console.log('FINISH')
}).on('error', (err) => {
console.log('ERROR', err)
})
Thanks!
CodePudding user response:
I solved.
The problem is not due to knex.js or the streams but to my development environment. I use k3d to simulate the production environment on the gcp. So to test my script locally I did a port-forward of the MySQL service.
It is not clear to me why the system crashes but by creating a container with my script so that it connects to the MySQL service, the algorithm works as I expect.
Thanks