Home > Blockchain >  Why am I losing some records in dataframe after transformation in PySpark compare to Pythonic versio
Why am I losing some records in dataframe after transformation in PySpark compare to Pythonic versio

Time:09-23

I'm a newbie in PySpark, and I want to add a new column that contains/reports normalized statistic frequency of features for each record/row/event in PySpark on synthetic data. (Columns Type & Encoding_type are categorical) At first, I have Spark data frame so-called sdf including 5 columns:

Below is the example:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("Sentence",92,6,"False",49),
         ("Sentence",17,3,"False",15),
         ("Sentence",17,3,"False",15),
         (0         , 0,0,"False", 0),
         (0         , 0,0,"False", 0),
         (0         , 0,0,"False", 0)
  ]

schema = StructType([ \
    StructField("Type",              StringType(),  True), \
    StructField("Length",            IntegerType(), True), \
    StructField("Token_number",      IntegerType(), True), \
    StructField("Encoding_type",     StringType(), True), \
    StructField("Character_feature", IntegerType(), True) \
    
  ])
 
sdf = spark.createDataFrame(data=data2,schema=schema)
sdf.printSchema()
sdf.show(truncate=False)

#root
# |-- Type: string (nullable = true)
# |-- Length: integer (nullable = true)
# |-- Token_number: integer (nullable = true)
# |-- Encoding_type: string (nullable = true)
# |-- Character_feature: integer (nullable = true)

# -------- ------ ------------ ------------- ----------------- 
#|Type    |Length|Token_number|Encoding_type|Character_feature|
# -------- ------ ------------ ------------- ----------------- 
#|Sentence|92    |6           |False        |49               |
#|Sentence|17    |3           |False        |15               |
#|Sentence|17    |3           |False        |15               |
#|0       |0     |0           |False        |0                |
#|0       |0     |0           |False        |0                |
#|0       |0     |0           |False        |0                |
# -------- ------ ------------ ------------- ----------------- 

Now I want to allocate statistic frequency of features and map them to the main frame sdf as follows:

#Statistical Preprocessing
def add_freq_to_features_(df):
  sdf_pltc = sdf.select('Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
  #sdf_pltc.show(truncate=0)

  sdf2 = (
      sdf_pltc
      .groupBy(sdf_pltc.columns)
      .agg(F.count('*').alias('Freq'))
    # .withColumn('Freq' , (col('Freq')  / col('Freq').sum()))    # Normalzing between 0 & 1
      .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
      
      
  )

  sdf2.show()
  
  return new_df

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features_(df)
features_df

# -------- ------ ------------ ------------- ----------------- ---- 
#|    Type|Length|Token_number|Encoding_type|Character_feature|Freq|
# -------- ------ ------------ ------------- ----------------- ---- 
#|Sentence|    92|           6|        False|               49|   1|
#|       0|     0|           0|        False|                0|   3|
#|Sentence|    17|           3|        False|               15|   2|
# -------- ------ ------------ ------------- ----------------- ---- 

So as it can be seen compared with the pythonic version due to groupby() mechanism, it is framed as below:

import pandas as pd

data = {'Type':              ["Sentence" , "Sentence" ,"Sentence" , "-", "-", "-"], 
        'Length':            [92, 17,17,0,0,0],
        'Token_number':      [6, 3,3,0,0,0],
        'Encoding_type':     ["False" , "False" ,"False" , "False", "False", "False"], 
        'Character_feature': [49, 15,15,0,0,0],
       }

# pass column names in the columns parameter 
df = pd.DataFrame(data)
#df

#Statistical Preprocessing
def add_freq_to_features(df):
  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
 # frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
  
  return new_df

features_df = add_freq_to_features(df)
features_df

# ---------- ------ ------------ ------------- ----------------- ----- 
#|Type      |Length|Token_number|Encoding_type|Character_feature|Freq |
# ---------- ------ ------------ ------------- ----------------- ----- 
#|Sentence  |92    |6           |False        |49               |1    |
#|Sentence  |17    |3           |False        |15               |2    |
#|Sentence  |17    |3           |False        |15               |2    |
#|-         |0     |0           |False        |0                |3    |
#|-         |0     |0           |False        |0                |3    |
#|-         |0     |0           |False        |0                |3    |
# ---------- ----- ------------- ------------- ----------------- ----- 

I couldn't figure out how I could reach the Pythonic frame in PySpark. I might mess up in something in functions.

So I kindly provided a colab notebook for quick debugging and commenting.

CodePudding user response:

Instead of using a group by with the count, I would recommend using count as a window function to maintain the same amount of records and a result similar to the purely pythonic approach.


from pyspark.sql import functions as F


def add_freq_to_features_(df):

  from pyspark.sql import Window

  sdf_pltc = sdf.select('Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
  sdf_pltc.show(truncate=0)

  sdf2 = (
      sdf_pltc.withColumn(
          'Freq',
          F.count("*").over(Window.partitionBy(sdf_pltc.columns))
      )
    # .withColumn('Freq' , (col('Freq')  / col('Freq').sum()))    # Normalzing between 0 & 1
      .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
      
      
  )

  sdf2.show()
  
  return sdf2

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features_(sdf)

Edit 1

The order of rows cannot be assumed in spark due to the distributed nature of shuffles and partitioning during operations. As such, we must explicitly specify how to order the rows after each operation. You could share what your dataset is ordered by and I may update the answer. I have demonstrated this with ordering by Type then Length in descending order.

If there are null values, you may replace them with a default value of 0 using df.na.fill(0).


#ggordon
from pyspark.sql import functions as F
from pyspark.sql import Window


def add_freq_to_features_(df):

  from pyspark.sql import Window
  sdf_pltc = sdf.select('Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
  print("Before Any Modification") # only included for debugging purposes
  sdf_pltc.show(truncate=0)

  # fill missing values with 0 using `na.fill(0)` before applying count as window function
  sdf2 = (
      sdf_pltc.na.fill(0).withColumn(
          'Freq',
          F.count("*").over(Window.partitionBy(sdf_pltc.columns)) 
      ).withColumn(
          'MaxFreq',
          F.max('Freq').over(Window.partitionBy())
      ).withColumn(
          'MinFreq',
          F.min('Freq').over(Window.partitionBy())
      )
  )
  print("After replacing null with 0 and counting by partitions") # only included for debugging purposes
  # use orderby as your last operation, only included here for debugging purposes
  sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
  sdf2.show(truncate=False) # only included for debugging purposes
  sdf2 = (
      sdf2.withColumn('Freq' , F.when(
           F.col('MaxFreq')==0,0
           ).otherwise(
              (F.col('Freq')-F.col('MinFreq')) / (F.col('MaxFreq') - F.col('MinFreq'))
           )  
      )    # Normalzing between 0 & 1
  )
  sdf2 = sdf2.drop('MinFreq').drop('MaxFreq')
  sdf2 = sdf2.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
  sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
  print("After normalization, encoding transformation and order by ") # only included for debugging purposes
  sdf2.show(truncate=False)
  

  
  return sdf2

Outputs

Before Any Modification
 -------- ------ ------------ ------------- ----------------- 
|Type    |Length|Token_number|Encoding_type|Character_feature|
 -------- ------ ------------ ------------- ----------------- 
|Sentence|92    |6           |False        |49               |
|Sentence|17    |3           |False        |15               |
|Sentence|17    |3           |False        |15               |
|0       |0     |0           |False        |0                |
|0       |0     |0           |False        |0                |
|0       |0     |0           |False        |0                |
 -------- ------ ------------ ------------- ----------------- 

After replacing null with 0 and counting by partitions
 -------- ------ ------------ ------------- ----------------- ---- ------- ------- 
|Type    |Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
 -------- ------ ------------ ------------- ----------------- ---- ------- ------- 
|Sentence|92    |6           |False        |49               |1   |3      |1      |
|Sentence|17    |3           |False        |15               |2   |3      |1      |
|Sentence|17    |3           |False        |15               |2   |3      |1      |
|0       |0     |0           |False        |0                |3   |3      |1      |
|0       |0     |0           |False        |0                |3   |3      |1      |
|0       |0     |0           |False        |0                |3   |3      |1      |
 -------- ------ ------------ ------------- ----------------- ---- ------- ------- 

After normalization, encoding transformation and order by 
 -------- ------ ------------ ------------- ----------------- ---- 
|Type    |Length|Token_number|Encoding_type|Character_feature|Freq|
 -------- ------ ------------ ------------- ----------------- ---- 
|Sentence|92    |6           |False        |49               |0.0 |
|Sentence|17    |3           |False        |15               |0.5 |
|Sentence|17    |3           |False        |15               |0.5 |
|0       |0     |0           |False        |0                |1.0 |
|0       |0     |0           |False        |0                |1.0 |
|0       |0     |0           |False        |0                |1.0 |
 -------- ------ ------------ ------------- ----------------- ---- 

Let me know if this works for you.

  • Related