Home > Enterprise >  groupBy with collect_list of struct failure
groupBy with collect_list of struct failure

Time:09-30

This really looks like a bug, but I cannot find the reason nor any info on internet

What happens: I have some java code that restructure some input dataset using collect_list(struct(...)) in the agg method following a groupBy. At first I had strange errors stating that valueArraySize which had some strange value should be positive (when the value was negative), or that it should be less then Integer.MAX_VALUE when it was way too big. What I understood is that spark read a long in an array of byte, probably a the wrong place, and got strange invalid number that make the code fail when using it.

To debug it, I tried to reproduce the bug with simple code. Here is a simple example that actually crash the jvm!

// Some definitions

@Data // use lombok to generate boilerplate code - no case class in java :\
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
    String user;
    String item;
    Double amount;
}

@Data
public static class Top {
    String user;
    List<Command> commands;
}

@Data
public static class Command {
    String item;
    Double amount;
}

And the code that fail:

    SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

    List<Order> orders = Arrays.asList(
            new Order("U1", "C1", 13.0),
            new Order("U1", "C2", 10.1),
            new Order("U2", "C3", 42.0));

    Dataset<Row> grouped = spark.createDataset(orders, Encoders.bean(Order.class))
            .groupBy("user")
            .agg(collect_list(struct( "item", "amount" )).as("commands"));

    List<Top> output = grouped.as(Encoders.bean(Top.class)).collectAsList();

Depending on the use case, I got different errors. For example, if all amount are integers (no decimal) or if I replace the type of amount by Integer, then the code works but the values of item and amount in the output are completely arbitrary.

Does anyone know about that bug/strange behavior?

I use this spark version

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0.2.6.5.108-1</version>
</dependency>

CodePudding user response:

I actually found a way to make the code work while writing the question:

=> The list of column name in the struct need to be put in alphabetical order.

(So in the example above, struct("amount", "item"))

But I still don't know why. Can't find anything on the net. The doc of struct clearly mentioned that it output a schema with the column names. And it can be visualized in the schema So I would expect them to be used...

CodePudding user response:

It seems to be a bug. I think what is causing the issue is described here inside the JIRA comments.

Should be fixed in version 2.3.3

  • Related