Home > Software design >  PySpark row to struct with specified structure
PySpark row to struct with specified structure

Time:08-05

This is my initial dataframe:

columns = ["CounterpartID","Year","Month","Day","churnprobability", "deadprobability"]
data = [(1234, 2021,5,12, 0.85,0.6),(1224, 2022,6,12, 0.75,0.6),(1345, 2022,5,13, 0.8,0.2),(234, 2021,7,12, 0.9,0.8)]

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

schema = StructType([
    StructField("client_id", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("month", IntegerType(), False),
    StructField("day", IntegerType(), False),
    StructField("churn_probability", DoubleType(), False),
    StructField("dead_probability", DoubleType(), False)
  ])
 
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

First dataframe

Then I do some transformations on the columns (basically, separating out the float columns into before decimals and after decimals columns) to get the intermediary dataframe.

abc = df.rdd.map(lambda x: (x[0],x[1],x[2],x[3],int(x[4]),int(x[4]%1 * pow(10,9)), int(x[5]),int(x[5]%1 * pow(10,9)) )).toDF(['client_id','year', 'month', 'day', 'churn_probability_unit', 'churn_probability_nano', 'dead_probability_unit', 'dead_probability_nano'] )

display(abc)

Intermediate dataframe

Below is the final desired dataframe (this is just an example of one row, but of course I'll need all the rows from the intermediary dataframe.

sjson = {"clientId": {"id": 1234 },"eventDate": {"year": 2022,"month": 8,"day": 5},"churnProbability": {"rate": {"units": "500","nanos": 780000000}},"deadProbability": {"rate": {"units": "500","nanos": 780000000}}}
df = spark.read.json(sc.parallelize([sjson])).select("clientId", "eventDate", "churnProbability", "deadProbability")

display(df)

Final Desired Dataframe

How do I reach this end state from the intermediary state efficiently for all rows?

End goal is to use this final dataframe to write to Kafka where the schema of the topic is a form of the final desired dataframe.

CodePudding user response:

So, I was able to solve this using structs , without using to_json

import pyspark.sql.functions as f

defg = abc.withColumn(
    "clientId",
    f.struct(
        f.col("client_id").
        alias("id")
    )).withColumn(
  "eventDate",
  f.struct(
     f.col("year").alias("year"),
     f.col("month").alias("month"),
     f.col("day").alias("day"),
  )
).withColumn(
"churnProbability",
  f.struct( f.struct(   
         f.col("churn_probability_unit").alias("unit"),
         f.col("churn_probability_nano").alias("nanos")
        ).alias("rate")
          )
 ).withColumn(
"deadProbability",
  f.struct( f.struct(   
         f.col("dead_probability_unit").alias("unit"),
         f.col("dead_probability_nano").alias("nanos")
        ).alias("rate")
          )
  ).select ("clientId","eventDate","churnProbability", "deadProbability" )

end dataframe

CodePudding user response:

I would probably eliminate the use of rdd logic (and again toDF) by using just one select from your original df:

from pyspark.sql import functions as F

defg = df.select(
    F.struct(F.col('client_id').alias('id')).alias('clientId'),
    F.struct('year', 'month', 'day').alias('eventDate'),
    F.struct(
        F.struct(
            F.floor('churn_probability').alias('unit'),
            (F.col('churn_probability') % 1 * 10**9).cast('long').alias('nanos')
        ).alias('rate')
    ).alias('churnProbability'),
    F.struct(
        F.struct(
            F.floor('dead_probability').alias('unit'),
            (F.col('dead_probability') % 1 * 10**9).cast('long').alias('nanos')
        ).alias('rate')
    ).alias('deadProbability'),
)
defg.show()
#  -------- ------------- ---------------- ---------------- 
# |clientId|    eventDate|churnProbability| deadProbability|
#  -------- ------------- ---------------- ---------------- 
# |  {1234}|{2021, 5, 12}|{{0, 850000000}}|{{0, 600000000}}|
# |  {1224}|{2022, 6, 12}|{{0, 750000000}}|{{0, 600000000}}|
# |  {1345}|{2022, 5, 13}|{{0, 800000000}}|{{0, 200000000}}|
# |   {234}|{2021, 7, 12}|{{0, 900000000}}|{{0, 800000000}}|
#  -------- ------------- ---------------- ---------------- 
  • Related