Home > Back-end >  How to split a spark scala dataframe if number of rows are greater than threshold
How to split a spark scala dataframe if number of rows are greater than threshold

Time:07-16

I am trying to load a huge dataframe in to parts if the number of rows are greater than threshold. We have a threshold of 3 mil rows if the data frame has let's say 4 mil rows we want to load 3 mil rows first then load 1 mil in next loop. I am trying following approach : this is a pseudo code what I tried but these things are not working in scala I am looking for substitute in scala ot may be a better way of doing this

if(deltaExtractCount > 3000000)
{
  length = len(df)
  count = 0
  while (count < length)
  { 
    new_df = df[count : count   3000000]
    insert(new_df)
    count = count   3M
  }
}

This is what I was trying but not successful. Haven't found equivalent scala function this pseudo code is more suitable for python . I am using spark 3.1.2 and scala 2.12 Let me know how I can achieve this split if there is other way

CodePudding user response:

I wrote this piece of code, I hope you can understand that. I also wrote some comments to make it easier. Assume csv is your dataset:

    // Assign an increasing ID to the column, so we know which rows to get
    csv = csv.withColumn("ID", expr("row_number() over (order by name)"));

    // Total dataset count
    long count = csv.count();
    // Threshold, modify this to the number you want
    long loadThreshold = 300000;
    // This will tell you how many 'loops' you will get
    double loopingTimes = Math.ceil(count * 1.0 / loadThreshold);

    // emptyDataset is an empty dataset, I did this just to fetch the schema
    Dataset<Row> emptyDataset = csv.limit(0);
    for (int i = 0; i < loopingTimes; i  ) {
        // This is the new fetched dataset
        Dataset<Row> withThreshold = csv
                .where(col("ID").gt(i * loadThreshold).and(col("ID").leq((i   1) * loadThreshold)));

        // Once 300000 rows are fetched, union to the main file
        emptyDataset = emptyDataset.union(withThreshold);
    }

I did a test case with loadThreshold equal to 25 and a dataset count of 60 rows, I got loadThreshold = 3. Then the partitions were fetched this way:

1st loop: 25 rows

2nd loop: 25 rows

3rd loop: 10 rows

This is written in Java, but it is almost identical in Scala as well, good luck!

CodePudding user response:

Here is how I did it its similar took hints from Spark Scala Split dataframe into equal number of rows

if (deltaExtractCount > 25) {

val k = 25
val totalCount = deltaExtractCount
var lowLimit = 0
var highLimit = lowLimit   k

while (lowLimit < totalCount) {
var split_df = masterLoadDf.where(s"row_num <= ${highLimit} and row_num > ${lowLimit}")
lowLimit = lowLimit   k
highLimit = highLimit   k

InsertIntoDB(split_df)


}

} else {
 InsertIntoDB(loadDeltaBatchDF)
}
  • Related