Home > OS >  Java CompletableFuture for sequential code
Java CompletableFuture for sequential code

Time:12-09

My new team is writing a Java gRPC service and to ensure that we never block the request thread we ended-up wrapping more or less ALL methods inside a CompletableFuture even if those endpoints are conceptually a sequential list of operation (no parallelism).

So the code look something like (a Java example is available at the end if needed) :

  methodA()
    methodB()
      methodD() (let say this one is a 15ms RPC call)
      methodE()
    methodC()
      methodF() (let say this one is a 5ms CPU intensive work)
      methodG()
 

Context:

  • In practice the application is much bigger and there're many more layers of functions
  • Each application host need to handle 1000 QPS, so you can imagine that methodA is called at that rate
  • Some function (few) make a RPC call that can take 5-30ms (IO)
  • Some function (very few) run CPU intensive work (< 5ms)

Edit 1: After more reading online yesterday, I understand that if, and only if, we are using true non-blocking HTTP and DB Client (and it doesn't seem like JDBC is non-blocking), this pattern can reduce the total number of threads required. My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation (reduce the overhead of switching threads and loading data), but if we didn't have enough memory to keep that many threads alive, then this notion of making the whole code non-blocking can reduce the number of thread and thus allow the application to scale to more request.

Question 1: I understand this unblocks the "request thread", but in practice what's the advantage? Are we truly saving CPU time? In the example below, it feels like "some" thread will be alive the whole time anyways (in the example below, mostly the thread from CompletableFuture.supplyAsync in methodD), it just happens that it’s not the same thread as the one who received the initial request.

Question 2: Is that pattern truly a "best practice" and all services should follow a similar pattern? Outside of making the code a bit harder to read I feel, per request 50 methods gets invoked and 50 times we call a mix of CompletableFuture .thenCompose() or .supplyAsync. It seems like it's would be adding some overhead. Was CompletableFuture designed to be used that way across the whole code base in every method?

Annex (java example):

  public void myEndpoint(MyRequest request, StreamObserver<MyResponse> responseObserver) {
    methodA(10)
        .thenApply((response) -> responseObserver.next(response));
    
  }

  public CompletableFuture<Integer> methodA(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodB)
        .thenCompose(this::methodC)
        .thenApply((i) -> {
          System.out.println("MethodA executed by ".concat(Thread.currentThread().getName()   ": "   i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodB(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodD)
        .thenCompose(this::methodE)
        .thenApply((i) -> {
          System.out.println("MethodB executed by ".concat(Thread.currentThread().getName()   ": "   i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodC(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodF)
        .thenCompose(this::methodG)
        .thenApply((i) -> {
          System.out.println("MethodC executed by ".concat(Thread.currentThread().getName()   ": "   i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodD(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Assume it's a RPC call that takes 5-30ms
        Thread.sleep(20);
        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName()   ": "   input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input   1;
    });
  }

  public CompletableFuture<Integer> methodE(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodE executed by ".concat(Thread.currentThread().getName()   ": "   input));
      return input   1;
    });
  }

  public CompletableFuture<Integer> methodF(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Let's assume it's a CPU intensive work that takes 2-5ms
        Thread.sleep(5);
        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName()   ": "   input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input   1;
    });
  }

  public CompletableFuture<Integer> methodG(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodG executed by ".concat(Thread.currentThread().getName()   ": "   input));
      return input   1;
    });
  }

CodePudding user response:

On the first question: there is no need to wrap all intermediate calls into CompleteableFutures if these are all sequential. You can as well wrap the chain of sequential calls into one single CompleteableFuture:

public void myEndpoint() throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> methodA(10))
        .thenApply((response) -> responseObserver.next(response));
}

public int methodA(Integer input) {
    var i = methodC((methodB(input)));
    System.out.println("MethodA executed by ".concat(Thread.currentThread().getName()   ": "   i));
    return i;
}

public int methodB(Integer input) {
    var i = methodE(methodD(input));
    System.out.println("MethodB executed by ".concat(Thread.currentThread().getName()   ": "   i));
    return i;
}

public int methodC(Integer input) {
    var i = methodG(methodF(input));
    System.out.println("MethodC executed by ".concat(Thread.currentThread().getName()   ": "   i));
    return i;
}

public Integer methodD(Integer input) {
    try {
        // Assume it's a RPC call that takes 5-30ms
        Thread.sleep(20);
        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName()   ": "   input));
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return input   1;
}

public int methodE(Integer input) {
    System.out.println("MethodE executed by ".concat(Thread.currentThread().getName()   ": "   input));
    return input   1;
}

public int methodF(Integer input) {
    try {
        // Let's assume it's a CPU intensive work that takes 2-5ms
        Thread.sleep(5);
        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName()   ": "   input));
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return input   1;
}

public int methodG(Integer input) {
    System.out.println("MethodG executed by ".concat(Thread.currentThread().getName()   ": "   input));
    return input   1;
}

The result is the same and the main thread gets not blocked. Since there are far less CompleteableFuture instances, there is less overhead from handing calls over from one thread to another.

Thus for question 2, no this is not best practices the way your example code is structured. Use CompleteableFuture if you must, avoid it otherwise. For example you need to use CompleteableFuture#thenCompose when you don't have control over the API you are calling (i.e. you can't change the return type from CompleteableFuture<T> to plain T). Another case is when you want to take advantage of parallelism. But this is not applicable here.

CodePudding user response:

Your question is a little unclear, and I think your comment phrases the real question better. Copied and pasted below.

In this example nothing is running in parallel, all methods are running sequentially as described in the Java code. My understanding is that CompletableFuture is used, in our code base, mostly to ensure that we don’t block the main thread on IO, but I’m trying to understand what’s the advantage of not blocking the request thread. At the end of the day some thread (MethodD’s CompletableFuture) is waiting for the IO, it just happens that it’s not the initial request thread. Do you think CompletableFuture should only be used to achieve parallelism?

Great question.

When you write plain, sequential code with no CompletableFuture<T>, your code runs synchronously on a single thread. No new threads are made to run your code. However, when you make a CompletableFuture<T> and put a task on it, a couple of things occur.

  • A new thread is created
  • The task given to the CompletableFuture<T> is placed onto this new thread
  • Java then uses a scheduler to jump back and forth between the main thread and the new thread when doing work.
    • Now, if your computer has multiple cores, and the number of cores is larger than the number of threads, then the above may not happen. But typically, the number of threads your application uses is way more than 2/4/8, so the point I am making above is almost always true

As you can see, the 3rd bullet is the most important because this is where the biggest benefit of multithreading occurs. The Java scheduler allows you to pause and continue threads on the fly, so that every thread can make some progress over time.

This is powerful because some threads may be waiting on IO work to be completed. A thread that is waiting on IO work is essentially doing nothing and wasting its turn on the CPU core.

By using a Java scheduler, you can minimize (if not eliminate) the time wasted on a core, and quickly switch to a thread that is not waiting on IO work to continue.

And this is probably the big benefit that your teammates are striving for. They want to ensure that all work that is being done wastes as little time as possible on the core. That is the big point.

That said, whether or not they actually succeeded depends on a couple of different questions that I need you to answer.

  1. You mentioned methodB and methodC. Can any of these methods be run in parallel? Does methodB have to fully complete before methodC can be executed? Or can methodC run in parallel to methodB? The same question applies to methodD and methodE, or methodF and methodG. I understand that currently, they run sequentially and wait for each other to finish. That's not my question. I am asking if it is possible for them to run in parallel.

  2. Are you using rate limiting tools like Semaphore anywhere in your code? Ideally, I would limit the scope of your answer to explicit code that your team writes, but if you know for sure that one of your dependencies does it, then feel free to mention it too.

  • If your answers to question 1 is no, then 99% of the time, doing what your team is doing is a terrible idea. The only method that should be in its own separate thread is methodA, but it sounds like you are already doing that.
  • If you answer to question 1 is at least partly yes but question 2 is no, then your teammates are pretty much correct. Over time, try to get an idea about where and when it makes the most sense. But as a first pass solution? This isn't a horrible idea.
    • If you said that B and C can be parallel, but D and E cannot, then wrapping B and C in CompletableFuture<T> makes sense, but not for D and E. They should just be basic sequential Java code. Unless of course, this a modular method/class that can be used in other code and might be parallel there. Nuance is required here, but starting with all of them being CompletableFuture<T> isn't a terrible first solution.
  • If your answer to question 1 is at least partly yes and your answer to question 2 is also yes, then you'll have to take a deep dive into your profiler to find the answer. Things like Semaphore are a different type of IO since they are a "context-dependent" tax that you pay depending on the state of your program around you. But since they are a construct that exists inside of your code, it becomes a dependable and measurable sort of IO that you can build deductions and assumptions off of. To keep my answer short, rate-limiting tools will allow you to make dependable assumptions about your code, and thus, any results from your profiler will be way more useful than they would be else where. methodA should definitely still be in its own separate thread.

So in short.

  • If 1 and 2 are yes, the answer is going to require nuance. Go into your profiler and find out.
  • If 1 is yes but 2 is no, your teammates are right. Change as needed, but go ahead with this solution.
  • If 1 is no, then your teammates are wrong. Make them change.

And for all of these, methodA should be in its own separate thread no matter what.

  • Related