I have a scenario where I need to send 1M message to a server using a blocking API. The API does not accept batch request so I have to send 1M message one by one.
Instead of using one thread, I am thinking to use multiple threads to send them.
The caller has to wait for all 1M messages to be sent before proceeding.
My implementation is as follows:
public class MySender {
private final MyPublisher myPublisher;
private final ExecutorService threadPool;
private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();
public MySender (final MyPublisher myPublisher,
ExecutorService threadPool) {
this.myPublisher= myPublisher;
this.threadPool = threadPool;
}
public void send(final MyData event) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
futureList.add(future);
}
public void notifySendComplete(final String id) {
if(!jobMap.containsKey(id)) {
return;
}
jobMap.get(id).forEach(CompletableFuture::join);
jobMap.remove(id);
}
private void doSubmit(final MyData event) {
try {
....
myPublisher.send(event);
....
} catch(Exception e) {
// log error
}
}
}
The client class can simply use this class this way:
myInputList.forEach(input -> {
MyData event = createData(input);
mySender.send(event);
})
mySender.notifySendComplete();
I think this implementation will work, but the problem is obvious. It needs to hold 1M CompletableFuture in the map, which are not eligible for garbage collection.
Is it a big problem? If so, are there any better approaches?
Restriction:
- The Thread pool cannot be shut down
- I can implement it using CountDownLatch but it is not allowed to use in my project.
CodePudding user response:
Is it a big problem?
It depends ... if memory is not a problem, no. If you want to be efficient with your memory use, then yes.
If so, are there any better approaches?
Forget the map, you are doing nothing valuable with it ... instead, use a integer variable (let's called, pending
) to keep track how many messages were queued and wait to be sent ... later, your notifySendComplete
method will check if there are pending message and if they are, it must sleep/wait until there are no more messages ...
To summarize,
send
should increment the variable by one ...
doSubmit
should decrement the variable by one ...
notifySendComplete
should check the value of variable and return only when the same is zero ... to wait, you should put your thread to sleep, and check the value again, do it until the variable becomes zero.
To keep control of the variable (sync its value between threads), there are many JAVA modes:
- you can used AtomicInteger ...
- you can used
synchronized
methods ...
Happy coding ...
By the way, for me, the memory problem is still there!! You may ask "why"? Because, you are loading 1M messages before sending them ... the approach should be: I read and send messages as long there are free threads or workers ... at the minute when there are no more workers, I should avoid reading more messages and just wait ...
CodePudding user response:
You might want to, no only multithread your solution, but use a Bulkhead, to limit the number of waiting requests, as, if not limited at the moment, the owners of the service will put a quota for you if you hit them with 1M requests. Take a look at Histryx Bulkhead, it manages the threadpool for you and you can adjut the max concurrent threads What is Bulkhead Pattern used by Hystrix? Also, as someone else mentioned, you start with 1M records in memory, where do those come from? If from a DB, you might want to consider using R2DB drivers to use a reactive flow that could process the messages as it loads them instead of loading all to memory and then process. See https://www.baeldung.com/java-reactive-systems
CodePudding user response:
You can use a parallel stream to turn events into the data you need to send only as fast as workers are ready for them.
myInputList.stream().parallel().forEach(mySender::doSubmit)
If you might be using parallel streams (or the common ForkJoinPool
) elsewhere in the program at the same time, you can create a thread pool dedicated to this process with the threads that you can spare.
int extraThreads = ...;
ForkJoinPool pool = new ForkJoinPool(extraThreads);
ForkJoinTask<?> ticket =
pool.submit(() -> myInputList.stream().parallel().forEach(mySender::doSubmit));
ticket.get();
pool.shutdown();