This really looks like a bug, but I cannot find the reason nor any info on internet
What happens: I have some java code that restructure some input dataset using collect_list(struct(...))
in the agg
method following a groupBy
. At first I had strange errors stating that valueArraySize
which had some strange value should be positive (when the value was negative), or that it should be less then Integer.MAX_VALUE
when it was way too big. What I understood is that spark read a long in an array of byte, probably a the wrong place, and got strange invalid number that make the code fail when using it.
To debug it, I tried to reproduce the bug with simple code. Here is a simple example that actually crash the jvm!
// Some definitions
@Data // use lombok to generate boilerplate code - no case class in java :\
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
String user;
String item;
Double amount;
}
@Data
public static class Top {
String user;
List<Command> commands;
}
@Data
public static class Command {
String item;
Double amount;
}
And the code that fail:
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
List<Order> orders = Arrays.asList(
new Order("U1", "C1", 13.0),
new Order("U1", "C2", 10.1),
new Order("U2", "C3", 42.0));
Dataset<Row> grouped = spark.createDataset(orders, Encoders.bean(Order.class))
.groupBy("user")
.agg(collect_list(struct( "item", "amount" )).as("commands"));
List<Top> output = grouped.as(Encoders.bean(Top.class)).collectAsList();
Depending on the use case, I got different errors. For example, if all amount are integers (no decimal) or if I replace the type of amount by Integer, then the code works but the values of item and amount in the output are completely arbitrary.
Does anyone know about that bug/strange behavior?
I use this spark version
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0.2.6.5.108-1</version>
</dependency>
CodePudding user response:
I actually found a way to make the code work while writing the question:
=> The list of column name in the struct
need to be put in alphabetical order.
(So in the example above, struct("amount", "item")
)
But I still don't know why. Can't find anything on the net. The doc of struct
clearly mentioned that it output a schema with the column names. And it can be visualized in the schema So I would expect them to be used...
CodePudding user response:
It seems to be a bug. I think what is causing the issue is described here inside the JIRA comments.
Should be fixed in version 2.3.3