I am trying to load a huge dataframe in to parts if the number of rows are greater than threshold. We have a threshold of 3 mil rows if the data frame has let's say 4 mil rows we want to load 3 mil rows first then load 1 mil in next loop. I am trying following approach : this is a pseudo code what I tried but these things are not working in scala I am looking for substitute in scala ot may be a better way of doing this
if(deltaExtractCount > 3000000)
{
length = len(df)
count = 0
while (count < length)
{
new_df = df[count : count 3000000]
insert(new_df)
count = count 3M
}
}
This is what I was trying but not successful. Haven't found equivalent scala function this pseudo code is more suitable for python . I am using spark 3.1.2 and scala 2.12 Let me know how I can achieve this split if there is other way
CodePudding user response:
I wrote this piece of code, I hope you can understand that. I also wrote some comments to make it easier. Assume csv
is your dataset:
// Assign an increasing ID to the column, so we know which rows to get
csv = csv.withColumn("ID", expr("row_number() over (order by name)"));
// Total dataset count
long count = csv.count();
// Threshold, modify this to the number you want
long loadThreshold = 300000;
// This will tell you how many 'loops' you will get
double loopingTimes = Math.ceil(count * 1.0 / loadThreshold);
// emptyDataset is an empty dataset, I did this just to fetch the schema
Dataset<Row> emptyDataset = csv.limit(0);
for (int i = 0; i < loopingTimes; i ) {
// This is the new fetched dataset
Dataset<Row> withThreshold = csv
.where(col("ID").gt(i * loadThreshold).and(col("ID").leq((i 1) * loadThreshold)));
// Once 300000 rows are fetched, union to the main file
emptyDataset = emptyDataset.union(withThreshold);
}
I did a test case with loadThreshold
equal to 25 and a dataset count
of 60 rows, I got loadThreshold
= 3. Then the partitions were fetched this way:
1st loop: 25 rows
2nd loop: 25 rows
3rd loop: 10 rows
This is written in Java, but it is almost identical in Scala as well, good luck!
CodePudding user response:
Here is how I did it its similar took hints from Spark Scala Split dataframe into equal number of rows
if (deltaExtractCount > 25) {
val k = 25
val totalCount = deltaExtractCount
var lowLimit = 0
var highLimit = lowLimit k
while (lowLimit < totalCount) {
var split_df = masterLoadDf.where(s"row_num <= ${highLimit} and row_num > ${lowLimit}")
lowLimit = lowLimit k
highLimit = highLimit k
InsertIntoDB(split_df)
}
} else {
InsertIntoDB(loadDeltaBatchDF)
}