Simplified Example
I have the following code which generates the sum of a series i.e. 1, 1 2, 1 2 3, 1 2 3 4
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(1,2,3,4);
Iterator<Integer> iterator = inputStream.iterator();
AtomicBoolean check = new AtomicBoolean(true);
Stream<Integer> outputStream = Stream.iterate(
iterator.next(),
i -> check.get(),
next -> {
check.set(iterator.hasNext());
return iterator.hasNext() ? iterator.next() next : next;
}
);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
Note this seems to get the correct check I want, but is there a better solution? Looks awful:
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(1,2,3,4);
Iterator<Integer> iterator = inputStream.iterator();
AtomicBoolean check = new AtomicBoolean(true);
Stream<Integer> outputStream = Stream.iterate(
iterator.next(),
i -> check.get(),
next -> {
check.set(iterator.hasNext());
return iterator.hasNext() ? iterator.next() next : next;
}
);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
Generic Problem description
Here's a generic code illustrating the problem.
public static <O, I> Stream<O> iterate(O seed, Stream<I> stream, BiFunction<I,O,O> function) {
return iterate(seed, stream.iterator(), function);
}
public static <O, I> Stream<O> iterate(O seed, Iterator<I> iterator, BiFunction<I,O,O> function) {
AtomicBoolean hasNext = new AtomicBoolean(true);
return Stream.iterate(
seed,
i -> hasNext.get(),
next -> {
hasNext.set(iterator.hasNext());
return iterator.hasNext() ? function.apply(iterator.next(), next) : next;
}
);
}
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(2,3,4);
BiFunction<Integer, Integer, Integer> f = Integer::sum;
Stream<Integer> outputStream = iterate(1, inputStream, f);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
Problem Context
Basically, I want to do this because I am creating a function which produces a forecast of the balance of an interest bearing account.
I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.
CodePudding user response:
... but is there a better solution?
Yes, an elegant solution can be by using Arrays#parallelPrefix
.
public class Main {
public static void main(String args[]) {
int[] arr = { 1, 2, 3, 4 };
Arrays.parallelPrefix(arr, Integer::sum);
System.out.println(Arrays.toString(arr));
}
}
Output:
[1, 3, 6, 10]
You can always convert back and forth between Stream<Integer>
and int[]
as per your requirement.
public class Main {
public static void main(String args[]) {
int[] arr = Stream.of(1, 2, 3, 4).mapToInt(Integer::valueOf).toArray();
Arrays.parallelPrefix(arr, Integer::sum);
System.out.println(Arrays.toString(arr));
// In case , you need a Stream<Integer> again
Stream<Integer> resultStream = Arrays.stream(arr).boxed();
// Or want the result as a List<Integer>
List<Integer> resultList = resultStream.toList();
System.out.println(resultList);
}
}
CodePudding user response:
Here's a generic code illustrating the problem.
public static <O, I> Stream<O> iterate(O seed, Stream<I> stream, BiFunction<I,O,O> function) { return iterate(seed, stream.iterator(), function); }
I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.
I've written the following solution based on the assumption that it's possible to merge the balances produces in the different threads, and the function BiFunction<I,O,O>
is associative, as well as a merging function BinaryOperator<O>
, that would be responsible for combining resulting values, i.e. balances.
Also, the value of seed
should meat the same requirements which are imposed on identity
of the Stream.reduce()
operation (otherwise parallel stream would yield an incorrect result).
Note that if at least one of these requirements is not possible to fulfil, streams are not the right tool for this problem.
Here's how it might be implemented using a custom Collector:
public static <R, T> Stream<R> iterate(R seed, Stream<T> stream,
BiFunction<T, R, R> mapper,
BinaryOperator<R> merger) {
return stream
.collect(sequenceCollector(seed, mapper, merger));
}
public static <R, T> Collector<T, ?, Stream<R>> sequenceCollector(R seed,
BiFunction<T, R, R> mapper,
BinaryOperator<R> merger) {
return Collector.of(
ArrayDeque::new,
(Deque<R> deque, T next) ->
deque.add(
mapper.apply(next, Objects.requireNonNullElse(deque.peekLast(), seed))
),
(left, right) -> {
R last = left.getLast();
right.forEach(next -> left.add(merger.apply(next, last)));
return left;
},
Collection::stream
);
}
main()
public static void main(String[] args) {
Stream<Integer> input1 = Stream.of(1, 2, 3, 4);
Stream<Integer> output1 = iterate(0, input1, Integer::sum, Integer::sum);
List<Integer> outputList1 = output1.toList();
System.out.println("Sequential: " outputList1);
Stream<Integer> input2 = Stream.of(1, 2, 3, 4).parallel();
Stream<Integer> output2 = iterate(0, input2, Integer::sum, Integer::sum);
List<Integer> outputList2 = output2.toList();
System.out.println("Parallel: " outputList2);
}
Output:
Sequential: [1, 3, 6, 10]
Parallel: [1, 3, 6, 10]