In my scenario, I'm exploding an array column so that I have one record per row so that I can perform a join, and then I recombine those exploded columns together
-------------- ------- ------------------------
| body | ID | array_column |
-------------- ------- ------------------------
| (large data) | guid1 | (entry1,entry2) |
-------------- ------- ------------------------
| (large data) | guid2 | (entry3,entry4,entry5) |
-------------- ------- ------------------------
->
-------------- ------- -----------------
| body | ID | array_column |
-------------- ------- -----------------
| (large data) | guid1 | entry1 |
-------------- ------- -----------------
| null | guid1 | entry2 |
-------------- ------- -----------------
| (large data) | guid2 | entry3 |
-------------- ------- -----------------
| null | guid2 | entry4 |
-------------- ------- -----------------
| null | guid2 | entry5 |
-------------- ------- -----------------
->
-------------- ------- ---------------------------------------------------
| body | ID | array_column |
-------------- ------- ---------------------------------------------------
| (large data) | guid1 | (entry1_enriched,entry2_enriched) |
-------------- ------- ---------------------------------------------------
| (large data) | guid2 | (entry3_enriched,entry4_enriched,entry5_enriched) |
-------------- ------- ---------------------------------------------------
Notice how after the explode, the body only exists in one of the explodes, and otherwise it's filled with null values. This is what I would like to occur. Right now it's being filled with the large body in each row, which is causing memory issues for us. I've considered splitting the table, dropping columns, and rejoining it with itself later, but because I'm operating on streaming data, this isn't really an option.
P.S. I'm not concerned with the re-joining (this part is working), only if there is an easy way to fill certain exploded rows' columns with dummy values to minimize space consumption
CodePudding user response:
It is possible to explode without including the values of other columns, but there is no built-in function to do so, you have to do it by hand
To do so, you transform your array_column
from an array of string to an array of struct<String, String>
, the first field of struct contains your body
column for the first entry in the array and None
for others entries and the last field contains an entry of your array_column
array. This new array is built using an user-defined function. Then you drop column body
and perform your explode
:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
class Element:
def __init__(self, body, entry):
self.body = body
self.entry = entry
@F.udf(returnType=ArrayType(StructType([StructField("body", StringType()), StructField("entry", StringType())])))
def array_with_body(body, array_column):
result = []
for i in range(0, len(array_column)):
if i == 0:
result.append(Element(body, array_column[i]))
else:
result.append(Element(None, array_column[i]))
return result
before_join_df = input_df.withColumn(
"array_column",
array_with_body(F.col('body'), F.col('array_column'))
) \
.drop('body') \
.withColumn('array_column', F.explode('array_column')) \
.select(
F.col('id'),
F.col('array_column.body').alias('body'),
F.col('array_column.entry').alias('array_column')
)
You then get your expected before_join_df
dataframe:
----- ------------ ------------
|id |body |array_column|
----- ------------ ------------
|guid1|(large data)|entry1 |
|guid1|null |entry2 |
|guid2|(large data)|entry3 |
|guid2|null |entry4 |
|guid2|null |entry5 |
----- ------------ ------------
You then perform your join and transformation, and to retrieve your final_df
dataframe from after_join_df
dataframe you can perform following transformations:
from pyspark.sql import functions as F
final_df = after_join_df.groupBy('id') \
.agg(
F.max(F.col('body')).alias('body'),
F.collect_list(F.col('array_column')).alias('array_column')
)
And final_df
is as follow:
----- ------------ ---------------------------------------------------
|id |body |array_column |
----- ------------ ---------------------------------------------------
|guid1|(large data)|[entry1_enriched, entry2_enriched] |
|guid2|(large data)|[entry3_enriched, entry4_enriched, entry5_enriched]|
----- ------------ ---------------------------------------------------
CodePudding user response:
RDD.flatMap can be used. A function (called flatten
in the code below) takes a single row from the input data and returns an iterator over a list of result rows. In this list the first row contains the large data
while for all other rows this column is set to None
:
df= ...
def flatten(row):
body=row[0]
id=row[1]
entries=row[2]
return iter([(body, id, entries[0])] [(None, id, e) for e in entries[1:]])
flattend_rdd=df.rdd.flatMap(flatten)
spark.createDataFrame(flattend_rdd, df.columns).show()
Output:
-------------- ----- ------------
| body| ID|array_column|
-------------- ----- ------------
|(large data 1)|guid1| entry1|
| null|guid1| entry2|
|(large data 2)|guid2| entry3|
| null|guid2| entry4|
| null|guid2| entry5|
-------------- ----- ------------