Home > Enterprise >  Grouping and Aggregating data based on List of columns
Grouping and Aggregating data based on List of columns

Time:12-22

I have a JSONArray which contains multiple JSONObjects and each JSON object represents a row of data. (Like a SQL row)

Example:

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 12121
}, {
    "col1": "c1",
    "col2": "r1",
    "col3": 1321
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4342
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4532
}]

A list containing the columns on which group by has to happen:

Example:

["col1","col2"]

Finally, the aggregate that has to be applied, MIN, MAX, SUM, AVG and also the column on which the aggregate has to be applied:

Expected Output: with aggregate being SUM

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 13442
},{
    "col1": "c1",
    "col2": "r2",
    "col3": 8874
}]

What I have tried so far:

I thought of comparing current with previous with the list of columns that I've whenever I see a change in the value I do an aggregate on that. But This method looks way too inefficient. I was thinking of using Java Streams, but I've very bad at it. Any help would be appreciated.

 if (agg.equalsIgnoreCase("MIN")) {
        Number min = data.getJSONObject(0).getNumber(column);
        for (int i = 0; i < data.length(); i  ) {
            JSONObject jsonObject = data.getJSONObject(i);
            if (i > 1) {
            }
        }
    }

CodePudding user response:

Depending on how much data you're going to be handling, a simple method that doesn't rely on streams is to use a Map. Hash the aggregation column values to produce map keys, and update the map values from the value of the aggregation column.

Here I've created an Operation interface that can be implemented for each of the operations (sum, max, min, etc.).

E.g.

interface Operation {
    Long update(Long currentAggregate, int nextValue);
}

class Sum implements Operation {
    @Override
    public Long update(Long currentAggregate, int nextValue) {
        return currentAggregate   nextValue;
    }
}

JSONArray aggregate(JSONArray array, String[] columns, String aggregateColumn, Operation op) {
    Map<String, Long> aggregates = new HashMap<>();
    for (int i = 0; i < array.size();   i) {
        JSONObject obj = array.getJsonObject(i);
        String key = getKey(obj, columns);
        Long current = aggregates.get(key);
        aggregates.put(key, op.update(current, obj.getInt(aggregateColumn)));
    }
    // Then split the map key back out to columns values (or use a more sophisticated 
    // object in place of 'aggregates' that also stores the column values explicitly) and 
    // return a JSONArray with values for the 'aggregateColumn' taken from 'aggregates'.
    // ...
}

String getKey(JSONObject obj, String[] columns) {
    // Assumes no column names include "_".
    StringBuilder builder = new StringBuilder();
    for (int i = 0; i < columns.length;   i)
        builder.append(obj.getString(columns[i])).append("_");
    return builder.toString();
}

CodePudding user response:

You have to break down the problem before you then realize it is not that hard especially when there is tooling that can match your needs so you don't even need to implement something yourself. Streams are totally a fine option here since Java 8 Streams API allows grouping streamed elements by keys and proceed the groups as a downstream for, say, aggregating operations.

Suppose you have a JSON source producing a huge dataset: for your example it still can be represented as a Stream<JSONObject>. I used your file to read it in a streaming manner producing ready-to-be-analyzed data stream (I bet my spliterator implementation is not perfect, but it seems to work):

public static <T> Stream<T> asStream(final JSONTokener jsonTokener) {
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED) {
        private Status status = Status.BEFORE_ARRAY;

        @Override
        public boolean tryAdvance(final Consumer<? super T> action) {
            for ( ; ; ) {
                switch ( status ) {
                case BEFORE_ARRAY:
                    jsonTokener.next('[');
                    status = Status.IN_ARRAY;
                    continue;
                case IN_ARRAY:
                    switch ( jsonTokener.nextClean() ) {
                    case ',':
                        return true;
                    case ']':
                        status = Status.AFTER_ARRAY;
                        return false;
                    default:
                        jsonTokener.back();
                        @SuppressWarnings("unchecked")
                        final T value = (T) jsonTokener.nextValue();
                        action.accept(value);
                        continue; // or return true?
                    }
                case AFTER_ARRAY:
                    throw new IllegalStateException();
                default:
                    throw new AssertionError(status);
                }
            }
        }
    }, false);
}

private enum Status {

    BEFORE_ARRAY,
    IN_ARRAY,
    AFTER_ARRAY

}

All it does is transforming some JSON token stream to a stream of something (since org.json object model does not suggest a common base class). . If you have a JSONArray buffered already, it can be streamed using something from here: Convert Iterable to Stream using Java 8 JDK

Next, merely use grouping collectors from the stream parsed above:

final Collector<JSONObject, ?, Map<List<String>, Double>> collector = Collectors.groupingBy(
        // your groups for (col1. col2)
        row -> List.of(row.getString("col1"), row.getString("col2")),
        // your aggregating SUM for col3
        Collectors.summingDouble(row -> row.getDouble("col3"))
);
Assertions.assertEquals(
        Map.of(List.of("c1", "r2"), 8874.0, List.of("c1", "r1"), 13442.0),
        JsonStreams.<JSONObject>asStream(new JSONTokener(reader))
                .collect(collector)
);

That's it for SUM. AVG result can be accomplished by using Collectors.averagingDouble.

  • Related