I have a schema like this :
root
|-- DataColumn1: struct (nullable = true)
| |-- colA: double (nullable = true)
| |-- colB: struct (nullable = true)
| | |-- fieldA: double (nullable = true)
| | |-- fieldB: double (nullable = true)
| | |-- fieldC: double (nullable = true)
| |-- colC: long (nullable = true)
| |-- colD: string (nullable = true)
|-- DataColumn2: string (nullable = true)
|-- DataColumn3: string (nullable = true)
My goal is to create a new column say 'DataColumn4' which is the sum of all the fields 'fieldA', 'fieldB' and 'fieldC' (fieldA fieldB fieldC) inside the struct 'colB' which is inside 'DataColumn1'.
There could be N number of fields inside 'colB' so how do I sum them all without accessing the fields one by one through DataColumn1.colB.fieldA, DataColumn1.colB.fieldB and so on?
Example data:
DataColumn1 DataColumn2 DataColumn3
(1, (1, 2, 3), 4, 5) XXX YYY
(1, (2, 3, 3), 8, 9) XYZ XYX
My expected result must have a new column that is a summation of the nested fields
DataColumn1 DataColumn2 DataColumn3. DataColumn4
(1, (1, 2, 3), 4, 5) XXX YYY. 6 (since 1 2 3 = 6)
(1, (2, 3, 3), 8, 9) XYZ XYX 8 (since 2 3 3 = 8)
How do I write a code for this in PySpark preferably without a PandasUDF?
CodePudding user response:
you can access the fields inside a struct with struct_column.struct_field
. e.g. - DataColumn1.colB.fieldA
. And, you can select all struct fields by using DataColumn1.colB.*
.
Here's an example of how you could do the summation. Given the following data.
--------------------
| c1_c2c3c4_c5|
--------------------
|{1, {1, 2, 3}, 4, 5}|
|{1, {2, 3, 3}, 8, 9}|
--------------------
root
|-- c1_c2c3c4_c5: struct (nullable = false)
| |-- c1: long (nullable = true)
| |-- c2c3c4: struct (nullable = false)
| | |-- c2: long (nullable = true)
| | |-- c3: long (nullable = true)
| | |-- c4: long (nullable = true)
| |-- c5: long (nullable = true)
| |-- c6: long (nullable = true)
To get the sum of those fields, we'll need the fields which can be extracted using a select
.
data_sdf.select('c1_c2c3c4_c5.c2c3c4.*').columns
# ['c2', 'c3', 'c4']
Actual summation code
# use reduce to add all struct fields
struct_field_sum = reduce(lambda x, y: x y,
[func.col('c1_c2c3c4_c5.c2c3c4.' k)
for k in data_sdf.select('c1_c2c3c4_c5.c2c3c4.*').columns]
)
# Column<'((c1_c2c3c4_c5.c2c3c4.c2 c1_c2c3c4_c5.c2c3c4.c3) c1_c2c3c4_c5.c2c3c4.c4)'>
data_sdf. \
withColumn('reqd_sum', struct_field_sum). \
show()
# -------------------- --------
# | c1_c2c3c4_c5|reqd_sum|
# -------------------- --------
# |{1, {1, 2, 3}, 4, 5}| 6|
# |{1, {2, 3, 3}, 8, 9}| 8|
# -------------------- --------
CodePudding user response:
To achieve this you would have to parse the child fields within your StructType
and finally reduce
them or you can also sum
[native python] by projecting them as a list
Example
structureData = [
("James","Smith","36636","M",(3100,200)),
("Michael","Rose","40288","M",(4300,200)),
("Robert","Williams","42114","M",(1400,300)),
("Maria","Jones","39192","F",(5500,300)),
("Jen","Brown","39156","F",(5000,600))
]
structureSchema = StructType([
StructField('firstname', StringType(), True),
StructField('lastname', StringType(), True),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', StructType([
StructField('component1', IntegerType(), True),
StructField('component2', IntegerType(), True)
]))
])
sparkDF = sql.createDataFrame(data=structureData,schema=structureSchema)
sparkDF.printSchema()
sparkDF.show(truncate=False)
sparkDF = sparkDF.withColumn('total_salary',reduce(lambda a, b: a b,
[F.col(f'salary.{c}') for c in sparkDF.schema['salary'].dataType.names ]
)
)\
.withColumn('total_salary_2'
,sum(sparkDF[f'salary.{c}'] for c in sparkDF.schema['salary'].dataType.names)
)
sparkDF.show()
--------- -------- ----- ------ ----------- ------------ --------------
|firstname|lastname| id|gender| salary|total_salary|total_salary_2|
--------- -------- ----- ------ ----------- ------------ --------------
| James| Smith|36636| M|{3100, 200}| 3300| 3300|
| Michael| Rose|40288| M|{4300, 200}| 4500| 4500|
| Robert|Williams|42114| M|{1400, 300}| 1700| 1700|
| Maria| Jones|39192| F|{5500, 300}| 5800| 5800|
| Jen| Brown|39156| F|{5000, 600}| 5600| 5600|
--------- -------- ----- ------ ----------- ------------ --------------
References -