Home > Software design >  Where to do sync
Where to do sync

Time:02-23

With the integrated use of multithreading technologies (threads) and parallel streams of the API Streams framework, create the most parallel synchronous system to determine the average length of all words in N text files.Where do I need to synchronize calculations and in my case do I need to, or maybe I need to implement another algorithm.

public class Dispatcher {
    public static void main(String[] args) {
        File[] files = {
                new File("text"),
                new File("text2"),
                new File("text3")
        };


        MaxLengthWord sumOfDigits = new MaxLengthWord(files);

        ForkJoinPool pool = new ForkJoinPool();
        final Integer invoke = pool.invoke(sumOfDigits);
        System.out.println(invoke);

    }

}

class MaxLengthWord extends RecursiveTask<Integer> {
    File[] files;

    public MaxLengthWord(File[] files) {
        this.files = files;
    }

    @Override
    protected Integer compute() {
        if (files.length > 1) {
            return Math.toIntExact(ForkJoinTask.invokeAll(createSubtasks()).stream()
                    .parallel()
                    .collect(Collectors.summarizingInt(ForkJoinTask::join))
                    .getSum());

        } else {
            return processing(files);
        }
    }

    private Collection<MaxLengthWord> createSubtasks() {
        List<MaxLengthWord> subTasks = new ArrayList<>();
        subTasks.add(new MaxLengthWord(
                Arrays.copyOfRange(files, 0, files.length / 2)));
        subTasks.add(new MaxLengthWord(
                Arrays.copyOfRange(files, files.length / 2, files.length)));
        return subTasks;
    }

    private Integer processing(File[] files) {
        int sumOfAverage = 0;
        try (Scanner sc = new Scanner(files[0])) {
            while (sc.hasNextLine()){
                 double average = Arrays.stream(sc.nextLine().split(" "))
                        .parallel()
                         .mapToInt(String::length)
                        .average()
                         .getAsDouble();

                sumOfAverage  = average;
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        return sumOfAverage;
    }
}

CodePudding user response:

The Collectors interface offers parallel operation. It creates multiple collections and merges them in he last step.

For more detailed information look at this post and the linked javadoc: Collecting from parallel stream in Java 8

Your problem should be solvable with a custom Collector. There are many guides out there explaining how to.

CodePudding user response:

Imagine you have 3 threads, thread A, B and C. Thread A is processing data each 1ms, thread B is processing data each 4ms and thread C is processing data each 6ms.

Thread processing times

  • Thread A: 1ms (1000hz)
  • Thread B: 4ms (250hz)
  • Thread C: 6ms (166,67hz)

You search for the best moment when syncing those threads, I call this the opt_sync_t.

Thread processing time of thread A is only a quarter of thread B.

A_proc_t = 1/4 * B_proc_t

Thread processing time of thread A is only a sixth of thread C.

A_proc_t = 1/6 * C_proc_t

Resulting in:

A_proc_t = 1/4 * B_proc_t = A_proc_t = 1/6 * C_proc_t

1/4 * B_proc_t = 1/6 * C_proc_t

6 * 1/4 * B_proc_t = 6 * 1/6 * C_proc_t

6/4 * B_proc_t = 6/6 * C_proc_t

3/2 * B_proc_t = C_proc_t

3 * B_proc_t = 2 * C_proc_t => opt_sync_t

opt_sync_t = 3 * B_proc_t = 2 * C_proc_t = 12ms

Meaning Thread A, B, C should be synced every 12ms for creating the most parallel synchronous system.

  • Related