My goal is to create a dataframe dynamically from columns and values from an external dataframe. This is how the dataframe is created with manual schema and data definitions:
val columnSufix: String = "isNull"
val data = Seq(Row(
details.filter(col("DAY").isNull).count(),
details.filter(col("CHANNEL_CATEGORY").isNull).count(),
details.filter(col("SOURCE").isNull).count(),
details.filter(col("PLATFORM").isNull).count()
)
)
val schema: StructType = new StructType()
.add(s"DAY_$columnSufix", LongType)
.add(s"CHANNEL_CATEGORY_$columnSufix", LongType)
.add(s"SOURCE_$columnSufix", LongType)
.add(s"PLATFORM_$columnSufix", LongType)
val testDf: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
testDf.show(false)
The output when I execute the above script is the following:
columnSufix: String = isNull
data: Seq[org.apache.spark.sql.Row] = List([0,0,0,83845])
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DAY_isNull,LongType,true),StructField(CHANNEL_CATEGORY_isNull,LongType,true),StructField(SOURCE_isNull,LongType,true),StructField(PLATFORM_isNull,LongType,true))
testDf: org.apache.spark.sql.DataFrame = [DAY_isNull: bigint, CHANNEL_CATEGORY_isNull: bigint ... 2 more fields]
So far so good. The problem is when I use the below script that makes the creation of the DataFrame more dynamic, I get an ArrayIndexOutOfBoundsException error:
val cols = details.columns.toSeq.take(4)
val columnSuffix: String = "ISNULL"
val data = cols.map(column => Row(details.filter(col(column).isNull).count())).toList
val schema = StructType(cols.map(column => StructField(column s"_$columnSuffix", LongType)))
val testDf: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
testDf.show(false)
The output when I compiled the above script is similar as the manual dataFrame creation, with the exception that when I run testDf.show(false), it gives me this error:
Caused by: RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, DAY_ISNULL), LongType, false) AS DAY_ISNULL#243076L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, CHANNEL_CATEGORY_ISNULL), LongType, false) AS CHANNEL_CATEGORY_ISNULL#243077L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, SOURCE_ISNULL), LongType, false) AS SOURCE_ISNULL#243078L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, PLATFORM_ISNULL), LongType, false) AS PLATFORM_ISNULL#243079L
Caused by: ArrayIndexOutOfBoundsException:
What's this happening? My feeling is that somehow Spark is not sending the values of details.filter(col(column).isNull).count())
straight to the invocation of createDataFrame
.
CodePudding user response:
My solution is in pyspark
you can try to do same in scala spark as well.
First reduce is present only in python you can find the same functionality in scala I guess fold does the same .
code goes here
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField,LongType,Row
from functools import reduce
data=[(32609,878,199),
(32609,878,199),
(32609,832,199),
(45470,231,199),
(45470,231,199),
(45470,231,199),
(42482,1001,299),
(42482,16,291)]
df1=spark.createDataFrame(data_with_null_examples,["DAY","CHANNEL_CATEGORY","SOURCE"])
details=df1.withColumn("PLATFORM",F.lit(None))
dyn_columns_list_org=["DAY","CHANNEL_CATEGORY","SOURCE","PLATFORM"]
dynamic_data = Row(*list(map(lambda x:details.filter(F.col(x).isNull()).count(),dyn_columns_list_org)))
print(dynamic_data)
schema = StructType()
dynamic_schema = reduce(lambda x,y:x.add(y "_isNull",data_type=LongType()),dyn_columns_list_org,schema)
print(dynamic_schema)
dyn_df = spark.createDataFrame([dynamic_data],schema=dynamic_schema)
dyn_df.show(10,0)
output :-
---------- ----------------------- ------------- ---------------
|DAY_isNull|CHANNEL_CATEGORY_isNull|SOURCE_isNull|PLATFORM_isNull|
---------- ----------------------- ------------- ---------------
|0 |0 |0 |8 |
---------- ----------------------- ------------- ---------------
Kindly upvote if you like my solution.
CodePudding user response:
The problem is, for each column, you're instantiating a row object:
val data = cols.map(column => Row(details.filter(col(column).isNull).count())).toList
So at the end, you'll have:
Seq(
Row(0),
Row(0),
Row(0),
Row(83845)
)
// so calling .toDF will expect 1 column name
But you're supplying 4 column names, what you intended to do is this:
Seq(Row(0, 0, 0, 83845)) // notice the single row, with 4 columns instead
What you need to do is:
val data: Seq[Long] = cols.map(column => details.filter(col(column).isNull).count())
// Now you have your values here (Seq(0, 0, 0, 83845))
The tricky part here is to convert Seq
to Product
(Tuples). This depends on how you want to manage it. You can use shapeless, or reflection to do this. Or if you're sure about the number of columns, I recommend this:
val tupled = data.toList match {
case a :: b :: c :: d :: Nil => (a, b, c, d)
}
At the end:
Seq(tupled).toDF(cols: _*)
You can also address your issue using spark APIs. I suggested the above approach because you seemed to prefer Scala-ish approach. If you need spark APIs, let me know, I'll update the answer.