Home > Enterprise >  JAVA: Split list into smaller lists and then stream them in multiple threads
JAVA: Split list into smaller lists and then stream them in multiple threads

Time:10-19

I have a database in which I have a table with links.

I have managed to find out that I can split a list into smaller list with the help of partitioning. According to this article it seems that Partition class is the fastest (https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/)

After I have split them into smaller lists I would like to use these links and scrape data from them simultanously. I could have used one list and then:

linkList.parallelStream().forEach(link -> {
        ScrapeLink(link);});

And set System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");

But in my case I would like to split them into smaller lists and then parallelStream to another method were I use ScraperAPI to have each split of links in one session (using session_number Reuse the same proxy by setting session_number=123 for example.)

So when I have a list like this: final List links = Arrays.asList("link1","link2","link3","link4","link5","link6","link7");

System.out.println(Partition.ofSize(numbers, 3));

I will have [[link1, link2, link3], [link4, link5, link6], [link7]] But how do I do then when I wants to process these small linklists in multiple threads at the same time?

My thoughts was to use Java 8 Streams. But their might be a better way?

CodePudding user response:

You can make use of Default forkjoinpool (of capacity 5 as you mentioned)

and also a custom thread pool defined for your sublists.

So you need to make a runnable class like this first which you will submit later on to your "New" Threadpool

    @AllArgsConstructor
    public void LinkProcessorTask implements Runnable {
        private String link;
        
        @Override
        public void run() {
            //do something with your link in the sublist
        }
    }
    

    public void doWork() {

      List<List<String>> subListsOfLinks = .... // partitioning function

      subListsOfLinks.parallelStream().forEach(list -> {
          ExecutorService executorService = Executors.newFixedThreadPool(4 //concurrency);
          for(String link: list) {
              LinkProcessorTask linkProcessorTask = new LinkProcessorTask(link);
              executorService.submit(linkProcessorTask);
              executorService.awaitTermination(//Timeout);

          }
      })
    }

Now its your own design decision now whether you want to make this new Threadpool a global one with fixed concurrency. or you want to invoke within your ForkJoinPool.

If you go within, total number of threads spawned = ForkJoinPoolConcurrency * CustomThreadPoolConcurrency.

Otherwise it will be just ForkJoinPoolConcurrency CustomThreadPoolConcurrency.

Depends on your machine etc, multiple factors.

You can avoid the hefty awaitTermination method by using a CountDownLatch, if you want to wait for all the links of a set to complete first and then proceed ahead.

CodePudding user response:

Do not use streams to schedule work, it is easy to connect an input stream (via an iterator) to your workers. It is not required (unless the task is really fast) but, if for some reason you need to take data in chunks, you can do it directly.

The reason is that you are not processing streamed data and you will have better control over how the tasks will be executed.

For example:

@SneakyThrows
public static void main(String... args) {

    int WORKERS = 5;
    int CHUNK = 5;

    Iterator<Integer> jobs = range(0, 47).iterator();

    List<Thread> workers = range(0, WORKERS).mapToObj(j -> new Thread(() -> {
        while(true) {
            int [] chunk = new int [CHUNK];
            int size = 0;
            synchronized (jobs) {
                while(size < CHUNK && jobs.hasNext())
                    chunk[size  ] = jobs.next();
            }
            if(size == 0)
                break;
            slowJobProccesor(j, chunk);
        }
    })).collect(toList());

    for (Thread worker : workers)
        worker.start();

    for (Thread worker : workers)
        worker.join();

}

@SneakyThrows
private static void slowJobProccesor(int j, int[] n) {
    Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 1_500));
    System.out.printf("Thread #%d done job: %s%n", j, Arrays.stream(n).mapToObj(Integer::toString).collect(joining(", ")));
}

with output

Thread #1 done job: 5, 6, 7, 8, 9
Thread #2 done job: 10, 11, 12, 13, 14
Thread #4 done job: 20, 21, 22, 23, 24
Thread #3 done job: 15, 16, 17, 18, 19
Thread #0 done job: 0, 1, 2, 3, 4
Thread #2 done job: 30, 31, 32, 33, 34
Thread #1 done job: 25, 26, 27, 28, 29
Thread #4 done job: 35, 36, 37, 38, 39
Thread #3 done job: 40, 41, 42, 43, 44
Thread #0 done job: 45, 46, 0, 0, 0
  • Related