With Java and Spark, if I have a Dataset<Row>
where I want to add a column filled with the values following the sequence 1 2 2 3 3 3...
I made a little Iterator<Integer>
implementation but how can I tell Spark to create the column and for each row call my Iterator.next()
method ? I was hoping I could pass a function to WithColumn
but there is no such handle.
public class ExampleIterator implements Iterator<Integer> {
private int n = 0;
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return ((1 (int)Math.sqrt(1 (8*n ))) /2);
}
}
CodePudding user response:
Here's a solution using Scala with udf, I can transform it to Java if you want but the idea is the same, I used your function to generate the new column:
val columns = Seq("c1", "c2")
val data = Seq((4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7))
val df = spark.sparkContext.parallelize(data).toDF(columns: _*)
var n = -1;
val createSeq = () => {
n = n 1
(1 Math.sqrt(1 (8 * n)).toInt) / 2
}
val createSeqUDF = udf(createSeq)
df.withColumn("c3", createSeqUDF()).show(false)
Make sure to define the variable n ouside of your udf function, here's the results:
--- --- ---
|c1 |c2 |c3 |
--- --- ---
|4 |7 |1 |
|4 |7 |2 |
|4 |7 |2 |
|4 |7 |3 |
|4 |7 |3 |
|4 |7 |3 |
|4 |7 |4 |
|4 |7 |4 |
|4 |7 |4 |
|4 |7 |4 |
|4 |7 |5 |
|4 |7 |5 |
|4 |7 |5 |
|4 |7 |5 |
|4 |7 |5 |
|4 |7 |6 |
--- --- ---
Here's the Java code:
long n = 0;
void test(SparkSession session) {
session.sqlContext().udf().register("seqUdf", o -> (1 (long) Math.sqrt(1 (8 * n ))) / 2, DataTypes.LongType);
List<String> data = Arrays.asList("abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz");
Dataset<String> dataDs = session.createDataset(data, Encoders.STRING());
Dataset<Row> results = dataDs.withColumn("newColumn",
functions.callUDF("seqUdf", dataDs.col("value")));
results.show(false);
}