Environment: nodejs 17.2, expressjs 4.17
Task: Data arrives at the url of the type "/user-actions" from different servers at a rate of about 2 requests per second. It is necessary to aggregate them and send them to another server once a second.
For example:
- Request #1: {userId: 1, action: "hitOne"}
- Request #2: {userId: 2, action: "hitFive"}
- Request #3: {userId:1, action: "hitFive"}
It is necessary to get 2 objects
const data = [{userId: 1, action: "hitOne"}, {userId: 2, action: "hitFive"}]
and
const data = [{userId: 1, action: "hitFive"}]
Each of these objects is sent to another server 1 time per second, something like this
http.post('http://newserver.url/user-actions', {data});
I was thinking of making a variable in which to record everything that comes in the request and send this variable to a new server once a second on a timer. But something tells me: or there will be problems with the variable (for example, due to concurrent request) and there will not always be the data I was waiting for, or some nonsense will come out with the timer.
How to implement such a scenario correctly?
CodePudding user response:
So you're creating some sort of a proxy service. You have two potential issues:
- data persistence and
- retries and pending requests.
I think your best bet would be to do something like this:
- in this particular service (with the API route), you just receive requests, and store them somewhere like Redis or RabbitMQ or Amazon SQS.
- in another service, you deal with retries, posting etc.
Even if you don't split up into two services, you still want to put things in specialised storage service in things like this. E.g. your process crashes, and you lose whatever data you have holding in memory. It also simplifies all the management details. Things like storing, sorting what came first, what requests are pending - those are super easy to deal with with RabbitMQ-type service.
But let's simplify things and hold them in memory. Now you have to deal with all these things yourself.
So here's a naive proxy service.
const axios = require('axios');
const axiosRetry = require('axios-retry');
const REQUEST_INTERVAL = 1000; // every second
const MAX_PARALLEL_REQUESTS = 3;
axiosRetry(axios, { retries: 3});
const bucket = [];
let exportingInterval;
let currentRequestsCount = 0;
const logRequest = (payload) => bucket.push(payload);
const makeRequest = (payload) => axios.post('http://remote-service/user-actions', payload);
const sendData = () => {
// first, make sure you don't make more then X parallel requests
if (currentRequestsCount > MAX_PARALLEL_REQUESTS) {
return
}
// clear the bucket
const data = bucket.splice(0, bucket.length);
if (!data.length) {
return;
}
// send the data, make sure you handle the failure.
currentRequestsCount = currentRequestsCount 1;
makeRequest()
.then(() => currentRequestsCount = currentRequestsCount - 1)
.catch(() => {
// what do do now? We failed three times.
// Let's put everything back in the bucket, try in the next request.
bucket.splice(bucket.length, 0, ...data);
currentRequestsCount = currentRequestsCount - 1;
});
}
const startExporting = () => exportingInterval = setInterval(() => sendData(), REQUEST_INTERVAL);
const stopExporting = () => clearInterval(exportingInterval)
module.exports = {
logRequest,
startExporting,
stopExporting,
}
Now, you would use this like this:
const proxyService = require('./proxy-service');
const app = express();
proxyService.startExporting();
// ...
app.post('/user-data', (req, res) => {
proxyService.logRequest(req.body);
res.end();
});
Now, this is just a simple example:
- You do need to make sure that retry policy is ok. You have to make sure you don't DoS wherever you're sending the data.
- You want to make sure you limit how many objects you send per call.
- maybe that 1-second interval is not a good thing - what if sending off the data lasts longer?
- What if you start piling requests? My simple counter only counts to 3, maybe it's more complicatd then that.
Also, calling that startExporting
and stopExporting
should go in some common place, where you boot the app, and where you cleanup in case of a graceful shutdown.
But it gives you an idea of how it can be done.
CodePudding user response:
It is a trade-off: time, data
If you want ensure enough data, you can use Promise.all() function. When both 2 requests is responded, you will call api to sent it. This will ensure that enough data but won't ensure that send data to another server once a second.
let pr1 = request1();
let pr2 = request2();
await data = promise.all([pr1,pr2]);
requestToAnotherServer(data);
If you want ensure that server will send data to another server once a second. You can set a timer, when time out, you send data that server received. But this won't ensure that enough data
sendData = [];
setInterval(()=>{
let pr1 = request1().then(data=>{sendData.push(data)});
let pr2 = request2().then(data=>{sendData.push(data)});
requestToAnotherServer(sendData);
sendData = [];
},1000)