Home > database >  Terminate a Stream when there is no incoming Data after curtain Timeout
Terminate a Stream when there is no incoming Data after curtain Timeout

Time:06-06

I have an InputStream and OutputStream (there is no socket).

I have a stream-based code that does some mapping/filtering/grouping/processing.

My main goal to terminate the stream if the maxDuration was exceeded:

void fillStreamMap(BufferedReader reader) {
    final Instant end = Instant.now().plusNanos(TimeUnit.NANOSECONDS.convert(maxDuration));

    this.map = reader.lines()
        .takeWhile(e -> checkTimeout(end))
        .map(this::jsonToBuyerEventInput)
        .filter(Objects::nonNull)
        .filter(getFilter()::apply)
        .limit(super.maxEvent)
        .collect(Collectors.groupingBy(BuyerEventInput::getBuyer));
}

boolean checkTimeout(Instant end){
    return Instant.now().getEpochSecond() <= end.getEpochSecond();
}

I'm using takeWhile which is a very useful function, but it checks the termination condition if there is an upcoming event.

So if there is no data sent, it doesn't check the condition because this function is built to take a Predicate as an argument.

Is there any way to accomplish this goal?

CodePudding user response:

You can execute your method inside an executor. Assuming a singleThread executor is enough for your case, here is the code:

    public void executeFillStreamMap(BufferedReader reader,long timeout){
         ExecutorService executor = Executors.newSingleThreadExecutor();
         Future<?> result = executor.submit(this::fillStreamMap(reader));
         try {
              result.get(timeout, TimeUnit.NANOSECONDS);
         }catch (Exception e) {
              // handle exceptions
         } finally{
              if(executor!=null){
                   executor.shutdown();
              }
         }
    }

And you won't need takeWhile method and Instant you defined. In case task take more than the defined timeout, get method will interrupt task, thus you can get an InterruptedException which should be handled.

CodePudding user response:

It can be done by utilizing CompletableFuture.completeOnTimeout() without blocking the main tread.

Here is a simple, reproducible example.

Let's start with creating an asynchronous task using supplyAssync(). As an argument of supplyAssync() let's provide a supplier which will call method getMap() that delivers the data from a file filled with some random strings collected into a map.

And then we need to apply completeOnTimeOut() that expects a default value (an empty map in this case), timeout and timeunit. completeOnTimeOut() will return a map (either filled data or empty default) wrapped by CompletableFuture.

With that, all previous code responsible for dealing timeout becomes redundant.

public static Map<String, List<String>> testMap;

public static void fillStreamMap(BufferedReader reader,
                                 long timeout,
                                 TimeUnit unit) {
    
    CompletableFuture.supplyAsync(() -> getMap(reader))
        .completeOnTimeout(Collections.emptyMap(), 1, TimeUnit.SECONDS)
        .thenAccept(map -> testMap = map)
        .thenRun(() -> testMap.forEach((k, v) -> System.out.println(k   " : "   v))); // visualizing the result
}

public static Map<String, List<String>> getMap(BufferedReader reader) {
    
    return reader.lines()
        .map(str -> str.split(";"))
        .collect(Collectors.groupingBy(
            str -> str[0],
            Collectors.mapping(str -> str[1],
                Collectors.toList())
        ));
}

main()

public static void main(String[] args) {
    try (var reader = new BufferedReader(new FileReader("filePathHere"))) {
        fillStreamMap(reader, 1, TimeUnit.SECONDS);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Output:

a : [Atlanta, Anchorage]
b : [Baltimore, Boston, Baltimore]
c : [Cleveland]
d : [Denver, Detroit]
  • Related