Home > Blockchain >  Create summary of Spark Dataframe
Create summary of Spark Dataframe

Time:05-21

I have a Spark Dataframe which I am trying to summarise in order to find overly long columns:

// Set up test data
// Look for long columns (>=3), ie 1 is ok row,, 2 is bad on column 3, 3 is bad on column 2
val df = Seq(
    ( 1, "a", "bb", "cc", "file1" ),
    ( 2, "d", "ee", "fff", "file2" ),
    ( 3, "g", "hhhh", "ii", "file3" )
    ).
    toDF("rowId", "col1", "col2", "col3", "filename")

I can summarise the lengths of the columns and find overly long ones like this:

// Look for long columns (>=3), ie 1 is ok row,, 2 is bad on column 3, 3 is bad on column 2
val df2 = df.columns
    .map(c => (c, df.agg(max(length(df(s"$c")))).as[String].first()))
    .toSeq.toDF("columnName", "maxLength")
    .filter($"maxLength" > 2)

If I try and add the existing filename column to the map I get an error:

val df2 = df.columns
    .map(c => ($"filename", c, df.agg(max(length(df(s"$c")))).as[String].first()))
    .toSeq.toDF("fn", "columnName", "maxLength")
    .filter($"maxLength" > 2)

I have tried a few variations of the $"filename" syntax. How can I incorporate the filename column into the summary?

columnName maxLength filename
col2 4 file3
col3 3 file2

The real dataframes have 300 columns and millions of rows so I cannot hard-type column names.

CodePudding user response:

@wBob does the following achieve your goal?

  1. group by file name and get the maximum per column:
    val cols = df.columns.dropRight(1) // to remove the filename col
    val maxLength = cols.map(c => s"max(length(${c})) as ${c}").mkString(",")
    print(maxLength)
    df.createOrReplaceTempView("temp")
    val df1 = spark
      .sql(s"select filename, ${maxLength} from temp group by filename")
    df1.show()`

With the output:

 -------- ----- ---- ---- ---- 
|filename|rowId|col1|col2|col3|
 -------- ----- ---- ---- ---- 
|   file1|    1|   1|   2|   2|
|   file2|    1|   1|   2|   3|
|   file3|    1|   1|   4|   2|
 -------- ----- ---- ---- ---- 
  1. Use subqueries to get the maximum per column and concatenate the results using union:
    df1.createOrReplaceTempView("temp2")
    val res = cols.map(col => {
      spark.sql(s"select '${col}' as columnName,  $col as maxLength, filename from temp2 "  
        s"where $col = (select max(${col}) from temp2)")
    }).reduce(_ union _)
    res.show()

With the result:

 ---------- --------- -------- 
|columnName|maxLength|filename|
 ---------- --------- -------- 
|     rowId|        1|   file1|
|     rowId|        1|   file2|
|     rowId|        1|   file3|
|      col1|        1|   file1|
|      col1|        1|   file2|
|      col1|        1|   file3|
|      col2|        4|   file3|
|      col3|        3|   file2|
 ---------- --------- -------- 

Note that there are multiple entries for rowId and col1 since the maximum is not unique.

There is probably a more elegant way to write it, but I am struggling to find one at the moment.

CodePudding user response:

It might be enough to sort your table by total text length. This can be achieved quickly and concisely.

df.select( 
  col("*"), 
  length( // take the length
    concat(   //slap all the columns together
      (for( col_name <- df.columns ) yield col(col_name)).toSeq:_*  
    )
  )
  .as("length") 
)
.sort( //order by total length
  col("length").desc
).show()
 ----- ---- ---- ---- -------- ------ 
|rowId|col1|col2|col3|filename|length|
 ----- ---- ---- ---- -------- ------ 
|    3|   g|hhhh|  ii|   file3|    13|
|    2|   d|  ee| fff|   file2|    12|
|    1|   a|  bb|  cc|   file1|    11|
 ----- ---- ---- ---- -------- ------ 

CodePudding user response:

Sorting an array[struct] it will sort on the first field first and second field next. This works as we put the size of the sting up front. If you re-order the fields you'll get different results. You can easily accept more than 1 result if you so desired but I think dsicovering a row is challenging is likely enough.

df.select(  
  col("*"), 
  reverse( //sort ascending
    sort_array( //sort descending
      array( // add all columns lengths to an array
        (for( col_name <- df.columns ) yield struct(length(col(col_name)),lit(col_name),col(col_name).cast("String")) ).toSeq:_* )
    )
  )(0) // grab the row max
  .alias("rowMax") )
  .sort("rowMax").show
 ----- ---- ---- ---- -------- -------------------- 
|rowId|col1|col2|col3|filename|              rowMax|
 ----- ---- ---- ---- -------- -------------------- 
|    1|   a|  bb|  cc|   file1|[5, filename, file1]|
|    2|   d|  ee| fff|   file2|[5, filename, file2]|
|    3|   g|hhhh|  ii|   file3|[5, filename, file3]|
 ----- ---- ---- ---- -------- -------------------- 

CodePudding user response:

Pushed a little further for better result.

df.select(  
 col("*"), 
 array( // make array of columns name/value/length
  (for{ col_name <- df.columns  } yield 
   struct(
    length(col(col_name)).as("length"),
    lit(col_name).as("col"),
    col(col_name).cast("String").as("col_value")
   )  
  ).toSeq:_* ).alias("rowInfo") 
 )
 .select(
  col("rowId"),
  explode( // explode array into rows
   expr("filter(rowInfo, x -> x.length >= 3)") //filter the array for the length your interested in
  ).as("rowInfo") 
 )
 .select(
  col("rowId"),
  col("rowInfo.*") // turn struct fields into columns
 )
 .sort("length").show

 ----- ------ -------- --------- 
|rowId|length|     col|col_value|
 ----- ------ -------- --------- 
|    2|     3|    col3|      fff|
|    3|     4|    col2|     hhhh|
|    3|     5|filename|    file3|
|    1|     5|filename|    file1|
|    2|     5|filename|    file2|
 ----- ------ -------- --------- 
  • Related