Home > OS >  Add column to Spark Dataset and pass an Iterator to fill rows
Add column to Spark Dataset and pass an Iterator to fill rows

Time:12-01

With Java and Spark, if I have a Dataset<Row> where I want to add a column filled with the values following the sequence 1 2 2 3 3 3...

I made a little Iterator<Integer> implementation but how can I tell Spark to create the column and for each row call my Iterator.next() method ? I was hoping I could pass a function to WithColumn but there is no such handle.

public class ExampleIterator implements Iterator<Integer> {

    private int n = 0;
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
      return ((1   (int)Math.sqrt(1 (8*n  ))) /2);
    }
}

CodePudding user response:

Here's a solution using Scala with udf, I can transform it to Java if you want but the idea is the same, I used your function to generate the new column:

  val columns = Seq("c1", "c2")
  val data = Seq((4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7), (4, 7))
  val df = spark.sparkContext.parallelize(data).toDF(columns: _*)
  var n = -1;
  val createSeq = () => {
    n = n   1
    (1   Math.sqrt(1   (8 * n)).toInt) / 2
  }
  val createSeqUDF = udf(createSeq)
  df.withColumn("c3", createSeqUDF()).show(false)

Make sure to define the variable n ouside of your udf function, here's the results:

 --- --- --- 
|c1 |c2 |c3 |
 --- --- --- 
|4  |7  |1  |
|4  |7  |2  |
|4  |7  |2  |
|4  |7  |3  |
|4  |7  |3  |
|4  |7  |3  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |4  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |5  |
|4  |7  |6  |
 --- --- --- 

Here's the Java code:

    long n = 0;

    void test(SparkSession session) {
        session.sqlContext().udf().register("seqUdf", o -> (1   (long) Math.sqrt(1   (8 * n  ))) / 2, DataTypes.LongType);
        List<String> data = Arrays.asList("abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz", "abc", "klm", "xyz");
        Dataset<String> dataDs = session.createDataset(data, Encoders.STRING());
        Dataset<Row> results = dataDs.withColumn("newColumn",
                functions.callUDF("seqUdf", dataDs.col("value")));
        results.show(false);
    }
  • Related