Home > other >  Java: How can I queue up asynchronous calls to be executed when a certain condition is met?
Java: How can I queue up asynchronous calls to be executed when a certain condition is met?

Time:11-01

TL;DR: I want to perform an asynchronous Call to a REST-API. The standard call would give me a CompleteableFuture<Response>, however because the API has a limit on how many calls it allows in a certain amount of time I want to be able to queue up calls to 1. execute them in order and 2. execute them only when I am not exceeding the APIs limits at that current moment, otherwise wait.

Long verson:

I am using Retrofit to perform Rest calls to an API and Retrofit returns a CompleteableFuture<WhateverResponseClassIDeclare> when I call it. However due to limitations of the API I am calling I want to have tight control over when and in what order my calls go out to it. In detail, too many calls in a certain timeframe would cause me to get IP banned. Similarly I want to maintain the order of my calls, even if they won't get executed immediately. The goal is to call a Wrapper of the API that returns a CompleteableFuture just like the original API but performs those in-between steps asynchronously.

I was playing around with BlockingQueues, Functions, Callables, Suppliers and everything inbetween, but I couldn't get it to work yet.

Following there is my currently NON FUNCTIONAL code I created as a Mockup to test the concept.

    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.function.Function;

    public class Sandbox2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {


        MockApi mockApi = new MockApi();

        CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
        CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
        CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");

        System.out.println("Result1: "   result1.get());
        System.out.println("Result2: "   result2.get());
        System.out.println("Result3: "   result3.get());

    }

    public static class MockApi {
        ActualApi actualApi = new ActualApi();

        BlockingDeque<Function<String, CompletableFuture<Integer>>> queueBlockingDeque = new LinkedBlockingDeque();

        public CompletableFuture<Integer> requestAThing(String req1) {

            Function<String, CompletableFuture<Integer>> function = new Function<String, CompletableFuture<Integer>>() {
                @Override
                public CompletableFuture<Integer> apply(String s) {
                    return actualApi.requestHandler(s);
                }
            };


            return CompletableFuture
                    .runAsync(() -> queueBlockingDeque.addLast(function))
                    .thenRun(() -> waitForTheRightMoment(1000))
                    .thenCombine(function)
        }

        private void waitForTheRightMoment(int time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ActualApi {

        public CompletableFuture<Integer> requestHandler(String request) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return Integer.parseInt(request.substring(3));
            });
        }
    }
}

CodePudding user response:

Pre JDK 9 (JDK 1.8)

You can make use of ScheduledExecutor that accepts items to execute asynchronously on a pre-configured thread pool at a pre-fixed rate / delay.

You can obtain such a service as follows:

private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Once an instance of ScheduledExecutorService is created, you can start submitting items (requests) to be executed as follows:

executorService.schedule(
    () -> actualApi.requestHandler(req),
    delay,
    unit
);

Meanwhile, using a direct call want lead a CompletableFuture<Integer> but instead would lead a ScheduledFuture<CompletableFuture<Integer>> on which you will have to block to get the wrapped result.

Instead, you would need to block on your final requests results inside the ScheduledExecutorService then wrap your final request result in a completed ComppletableFuture:

public <T> CompletableFuture<T> scheduleCompletableFuture(
        final CompletableFuture<T> command,
        final long delay,
        final TimeUnit unit) {
    final CompletableFuture<T> completableFuture = new CompletableFuture<>();
    this.executorService.schedule(
            (() -> {
                try {
                    return completableFuture.complete(command.get());
                } catch (Throwable t) {
                    return completableFuture.completeExceptionally(t);
                }
            }),
            delay,
            unit
    );
    return completableFuture;
}

Here down a review version of your implementation:

public class Sandbox2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {


        MockApi mockApi = new MockApi();

        CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
        CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
        CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");

        System.out.println("Result1: "   result1.get());
        System.out.println("Result2: "   result2.get());
        System.out.println("Result3: "   result3.get());

    }

    public static class MockApi {

        private final AtomicLong delay = new AtomicLong(0);

        private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        public CompletableFuture<Integer> requestAThing(String req1) {
            return this.scheduleCompletableFuture(new ActualApi().requestHandler(req1), delay.incrementAndGet(), TimeUnit.SECONDS);
        }

        public <T> CompletableFuture<T> scheduleCompletableFuture(
                final CompletableFuture<T> command,
                final long delay,
                final TimeUnit unit) {
            final CompletableFuture<T> completableFuture = new CompletableFuture<>();
            this.executorService.schedule(
                    (() -> {
                        try {
                            return completableFuture.complete(command.get());
                        } catch (Throwable t) {
                            return completableFuture.completeExceptionally(t);
                        }
                    }),
                    delay,
                    unit
            );
            return completableFuture;
        }
    }

    public static class ActualApi {

        public CompletableFuture<Integer> requestHandler(String request) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return Integer.parseInt(request.substring(3));
            });
        }
    }

}

JDK 9 and onward

If you are using a JDK 9 version, you may make use of the supported delayed Executor:

CompletableFuture<String> future = new CompletableFuture<>();
future.completeAsync(() -> {
    try {
        // do something
     } catch(Throwable e) {
        // do something on error
     }
  }, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));

Your MockApi#requestAThing would then be cleaner and shorter and you are no more in need of a custom ScheduledExecutor:

public static class MockApi {

    private final AtomicLong delay = new AtomicLong(0);

    public CompletableFuture<Integer> requestAThing(String req1) {
        CompletableFuture<Void> future = new CompletableFuture<>();
        return future.completeAsync(() -> null, CompletableFuture.delayedExecutor(delay.incrementAndGet(), TimeUnit.SECONDS))
                .thenCombineAsync(new ActualApi().requestHandler(req1), (nil, result) -> result);
    }
// ...
}

CodePudding user response:

You might consider using bucket4j

CodePudding user response:

I have found a way to produce my desired behaviour. By limiting my Executor to a single Thread I can queue up calls and they will follow the order I queued them up in.

I will supply the code of my mock classes below for anyone interested:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sandbox2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {


        MockApi mockApi = new MockApi();


        CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
        System.out.println("Request1 queued up");
        CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
        System.out.println("Request2 queued up");
        CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");
        System.out.println("Request3 queued up");

        //Some other logic happens here
        Thread.sleep(10000);

        System.out.println("Result1: "   result1.get());
        System.out.println("Result2: "   result2.get());
        System.out.println("Result3: "   result3.get());

        System.exit(0);
    }

    public static class MockApi {
        ActualApi actualApi = new ActualApi();
        private ExecutorService executorService = Executors.newSingleThreadExecutor();
        ;


        public CompletableFuture<Integer> requestAThing(String req1) {

            CompletableFuture<Integer> completableFutureCompletableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    System.out.println("Waiting with "   req1);
                    waitForTheRightMoment(new Random().nextInt(1000)   1000);
                    System.out.println("Done Waiting with "   req1);
                    return actualApi.requestHandler(req1).get();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            }, executorService);


            return completableFutureCompletableFuture;
        }

        private void waitForTheRightMoment(int time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ActualApi {

        public CompletableFuture<Integer> requestHandler(String request) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(new Random().nextInt(1000)   1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Request Handled "   request);
                return Integer.parseInt(request.substring(3));
            });
        }
    }
}
  • Related