Home > OS >  Is it possible to explode a column without including the values from other columns?
Is it possible to explode a column without including the values from other columns?

Time:09-24

In my scenario, I'm exploding an array column so that I have one record per row so that I can perform a join, and then I recombine those exploded columns together

 -------------- ------- ------------------------ 
|     body     |  ID   |     array_column       |
 -------------- ------- ------------------------ 
| (large data) | guid1 |     (entry1,entry2)    |
 -------------- ------- ------------------------ 
| (large data) | guid2 | (entry3,entry4,entry5) |
 -------------- ------- ------------------------ 

->

 -------------- ------- ----------------- 
|     body     |  ID   |  array_column   |
 -------------- ------- ----------------- 
| (large data) | guid1 |      entry1     |
 -------------- ------- ----------------- 
|     null     | guid1 |      entry2     |
 -------------- ------- ----------------- 
| (large data) | guid2 |      entry3     |
 -------------- ------- ----------------- 
|     null     | guid2 |      entry4     |
 -------------- ------- ----------------- 
|     null     | guid2 |      entry5     |
 -------------- ------- ----------------- 

->

 -------------- ------- --------------------------------------------------- 
|     body     |  ID   |                   array_column                    |
 -------------- ------- --------------------------------------------------- 
| (large data) | guid1 |            (entry1_enriched,entry2_enriched)      |
 -------------- ------- --------------------------------------------------- 
| (large data) | guid2 | (entry3_enriched,entry4_enriched,entry5_enriched) |
 -------------- ------- --------------------------------------------------- 

Notice how after the explode, the body only exists in one of the explodes, and otherwise it's filled with null values. This is what I would like to occur. Right now it's being filled with the large body in each row, which is causing memory issues for us. I've considered splitting the table, dropping columns, and rejoining it with itself later, but because I'm operating on streaming data, this isn't really an option.

P.S. I'm not concerned with the re-joining (this part is working), only if there is an easy way to fill certain exploded rows' columns with dummy values to minimize space consumption

CodePudding user response:

It is possible to explode without including the values of other columns, but there is no built-in function to do so, you have to do it by hand

To do so, you transform your array_column from an array of string to an array of struct<String, String>, the first field of struct contains your body column for the first entry in the array and None for others entries and the last field contains an entry of your array_column array. This new array is built using an user-defined function. Then you drop column body and perform your explode:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

class Element:

    def __init__(self, body, entry):
        self.body = body
        self.entry = entry


@F.udf(returnType=ArrayType(StructType([StructField("body", StringType()), StructField("entry", StringType())])))
def array_with_body(body, array_column):
    result = []
    for i in range(0, len(array_column)):
        if i == 0:
            result.append(Element(body, array_column[i]))
        else:
            result.append(Element(None, array_column[i]))
    return result


before_join_df = input_df.withColumn(
    "array_column", 
    array_with_body(F.col('body'), F.col('array_column'))
  ) \
  .drop('body') \
  .withColumn('array_column', F.explode('array_column')) \
  .select(
    F.col('id'), 
    F.col('array_column.body').alias('body'),
    F.col('array_column.entry').alias('array_column')
  )

You then get your expected before_join_df dataframe:

 ----- ------------ ------------ 
|id   |body        |array_column|
 ----- ------------ ------------ 
|guid1|(large data)|entry1      |
|guid1|null        |entry2      |
|guid2|(large data)|entry3      |
|guid2|null        |entry4      |
|guid2|null        |entry5      |
 ----- ------------ ------------ 

You then perform your join and transformation, and to retrieve your final_df dataframe from after_join_df dataframe you can perform following transformations:

from pyspark.sql import functions as F

final_df = after_join_df.groupBy('id') \
  .agg(
    F.max(F.col('body')).alias('body'), 
    F.collect_list(F.col('array_column')).alias('array_column')
  )

And final_df is as follow:

 ----- ------------ --------------------------------------------------- 
|id   |body        |array_column                                       |
 ----- ------------ --------------------------------------------------- 
|guid1|(large data)|[entry1_enriched, entry2_enriched]                 |
|guid2|(large data)|[entry3_enriched, entry4_enriched, entry5_enriched]|
 ----- ------------ --------------------------------------------------- 

CodePudding user response:

RDD.flatMap can be used. A function (called flatten in the code below) takes a single row from the input data and returns an iterator over a list of result rows. In this list the first row contains the large data while for all other rows this column is set to None:

df= ...

def flatten(row):
    body=row[0]
    id=row[1]
    entries=row[2]
    return iter([(body, id, entries[0])]   [(None, id, e) for e in entries[1:]])

flattend_rdd=df.rdd.flatMap(flatten)

spark.createDataFrame(flattend_rdd, df.columns).show()

Output:

 -------------- ----- ------------ 
|          body|   ID|array_column|
 -------------- ----- ------------ 
|(large data 1)|guid1|      entry1|
|          null|guid1|      entry2|
|(large data 2)|guid2|      entry3|
|          null|guid2|      entry4|
|          null|guid2|      entry5|
 -------------- ----- ------------ 
  • Related