Home > Enterprise >  How to merge two groupingBy in java streams?
How to merge two groupingBy in java streams?

Time:12-04

I have a input object

  @Getter
  class Txn {

    private String hash;
    private String withdrawId;
    private String depositId;
    private Integer amount;
    private String date;

  }

and the output object is

  @Builder
  @Getter
  class UserTxn {

    private String hash;
    private String walletId;
    private String txnType;
    private Integer amount;
  }

In the Txn object transfers the amount from the withdrawId -> depositId.

what I am doing is I am adding all the transactions (Txn objects) in a single amount grouped by hash.

but for that I have to make two streams for groupingby withdrawId and second or for depositId and then the third stream for merging them

grouping by withdrawId

var withdrawStream = txnList.stream().collect(Collectors.groupingBy(Txn::getHash, LinkedHashMap::new,
        Collectors.groupingBy(Txn::getWithdrawId, LinkedHashMap::new, Collectors.toList())))
    .entrySet().stream().flatMap(hashEntrySet -> hashEntrySet.getValue().entrySet().stream()
        .map(withdrawEntrySet ->
            UserTxn.builder()
                .hash(hashEntrySet.getKey())
                .walletId(withdrawEntrySet.getKey())
                .txnType("WITHDRAW")
                .amount(withdrawEntrySet.getValue().stream().map(Txn::getAmount).reduce(0, Integer::sum))
                .build()
        ));

grouping by depositId

var depositStream = txnList.stream().collect(Collectors.groupingBy(Txn::getHash, LinkedHashMap::new,
        Collectors.groupingBy(Txn::getDepositId, LinkedHashMap::new, Collectors.toList())))
    .entrySet().stream().flatMap(hashEntrySet -> hashEntrySet.getValue().entrySet().stream()
        .map(withdrawEntrySet ->
            UserTxn.builder()
                .hash(hashEntrySet.getKey())
                .walletId(withdrawEntrySet.getKey())
                .txnType("DEPOSIT")
                .amount(withdrawEntrySet.getValue().stream().map(Txn::getAmount).reduce(0, Integer::sum))
                .build()
        ));

then merging them again, using deposites - withdraws

var res = Stream.concat(withdrawStream, depositStream).collect(Collectors.groupingBy(UserTxn::getHash, LinkedHashMap::new,
    Collectors.groupingBy(UserTxn::getWalletId, LinkedHashMap::new, Collectors.toList())))
    .entrySet().stream().flatMap(hashEntrySet -> hashEntrySet.getValue().entrySet().stream()
        .map(withdrawEntrySet -> {
              var depositAmount = withdrawEntrySet.getValue().stream().filter(userTxn -> userTxn.txnType.equals("DEPOSIT")).map(UserTxn::getAmount).reduce(0, Integer::sum);
              var withdrawAmount = withdrawEntrySet.getValue().stream().filter(userTxn -> userTxn.txnType.equals("WITHDRAW")).map(UserTxn::getAmount).reduce(0, Integer::sum);
              var totalAmount = depositAmount-withdrawAmount;
              return UserTxn.builder()
                  .hash(hashEntrySet.getKey())
                  .walletId(withdrawEntrySet.getKey())
                  .txnType(totalAmount > 0 ? "DEPOSIT": "WITHDRAW")
                  .amount(totalAmount)
                  .build();
            }
        ));

My question is, How can I do this in one stream. Like by somehow groupingBy withdrawId and depositId is one grouping.

something like

res = txnList.stream()
        .collect(Collectors.groupingBy(Txn::getHash,
            LinkedHashMap::new,
            Collectors.groupingBy(Txn::getWithdrawId && Txn::getDepositId,
                LinkedHashMap::new, Collectors.toList())))
        .entrySet().stream().flatMap(hashEntrySet -> hashEntrySet.getValue().entrySet().stream()
            .map(walletEntrySet ->
                {
                  var totalAmount = walletEntrySet.getValue().stream().map(
                      txn -> Objects.equals(txn.getDepositId(), walletEntrySet.getKey())
                          ? txn.getAmount() : (-txn.getAmount())).reduce(0, Integer::sum);
                  return UserTxn.builder()
                      .hash(hashEntrySet.getKey())
                      .walletId(walletEntrySet.getKey())
                      .txnType("WITHDRAW")
                      .amount(totalAmount)
                      .build();
                }
            ));

CodePudding user response:

I wouldn’t use this in my code because I think it’s not readable and will be very hard to change and manage in the future(SOLID).
But in case you still want this-
If I got your design right hash is unique per user and transaction will only have deposit or withdrawal, if so, this will work-
You could triple groupBy via collectors chaining like you did in your example. You can create the Txn type via simple map function just check which id is null.

Map<String, Map<String, Map<String, List<Txn>>>> groupBy = txnList.stream().collect( Collectors.groupingBy(Txn::getHash, LinkedHashMap::new, Collectors.groupingBy(Txn::getDepositId, LinkedHashMap::new, Collectors.groupingBy(Txn::getWithdrawId, LinkedHashMap::new,Collectors.toList()))));

then use the logic from your example on this stream.

CodePudding user response:

TL;DR

For those who didn't understand the question, OP wants to generate from each Txn instance (Txn probably stands for transaction) two peaces of data: hash and withdrawId aggregated amount, and hash and depositId aggregated amount.

And then they want the two parts together (for that reason they were creating the two streams, and then concatenating them).

Note: it seems like there's a logical flow in the original code: the same amount gets associated with withdrawId and depositId. Which doesn't reflect the amount has been taken from one account and transferred to another. Hence, it would make sense if for depositId amount would be used as is and for withdrawId - negated (i.e. -1 * amount).

Collectors.teeing()

You can make use of the Java 12 Collector teeing() and internally group stream elements into two distinct Maps:

  • the first one by grouping the stream data by withdrawId and hash.

  • and another one by grouping the data depositId and hash.

Teeing expects three arguments: 2 downstream Collectors and a Function combining the results produced by collectors.

As the downstream of teeing() we can use a combination of Collectors groupingBy() and summingInt(), the second one is needed to accumulate integer amount of the transaction.

Note that there's no need in using nested Collector groupingBy() instead we can create a custom type that would hold id and hash (and its equals/hashCode should be implemented based on the wrapped id and hash). Java 16 record fits into this role perfectly well:

public record HashWalletId(String hash, String walletId) {}

Instances of HashWalletId would be used as Keys in both intermediate Maps.

The finisher function of teeing() would merge the results of the two Maps together.

The only thing left is to generate instances of UserTxn out of map entries.

List<Txn> txnList = // initializing the list
        
List<UserTxn> result = txnList.stream()
    .collect(Collectors.teeing(
        Collectors.groupingBy(
            txn -> new HashWalletId(txn.getHash(), txn.getWithdrawId()),
            Collectors.summingInt(txn -> -1 * txn.getAmount())), // because amount has been withdrawn
        Collectors.groupingBy(
            txn -> new HashWalletId(txn.getHash(), txn.getDepositId()),
            Collectors.summingInt(Txn::getAmount)),
        (map1, map2) -> {
            map2.forEach((k, v) -> map1.merge(k, v, Integer::sum));
            return map1;
        }
    ))
    .entrySet().stream()
    .map(entry -> UserTxn.builder()
        .hash(entry.getKey().hash())
        .walletId(entry.getKey().walletId())
        .txnType(entry.getValue() > 0 ? "DEPOSIT" : "WITHDRAW")
        .amount(entry.getValue())
        .build()
    )
    .toList(); // remove the terminal operation if your goal is to produce a Stream
  • Related