Home > Back-end >  PySpark: Operations with columns given different levels of aggregation and conditions
PySpark: Operations with columns given different levels of aggregation and conditions

Time:04-03

I want to get a ratio of sentiment, and for that I need to calculate how many positives and how many negatives there are per topic, and then divide it by the total of records of each topic.

Let's say I have this dataset:

 ----- --------- 
|topic|sentiment|
 ----- --------- 
|Chair| positive|
|Table| negative|
|Chair| negative|
|Chair| negative|
|Table| positive|
|Table| positive|
|Table| positive|
 ----- --------- 

In this case, I could give a value of -1 to 'negative' and 1 to 'positive', then this ratio would be 0.5 in the case of Table (negative positive positive positive) / total_count), and -0.33 in the case of Chair: (positive negative negative) / total_count).

I have come up with this solution but seems way too complicated:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, when


spark = SparkSession.builder.appName('SparkExample').getOrCreate()

data_e = [("Chair","positive"),
    ("Table","negative"),
    ("Chair","negative"),
    ("Chair","negative"),
    ("Table","positive"),
    ("Table","positive")
  ]

schema_e = StructType([ \
    StructField("topic",StringType(),True), \
    StructField("sentiment",StringType(),True), \
  ])
 
df_e = spark.createDataFrame(data=data_e,schema=schema_e)

df_e_int = df_e.withColumn('sentiment_int', 
                           when(col('sentiment') == 'positive', 1) \
               .otherwise(-1)) \
               .select('topic', 'sentiment_int')

agg_e = df_e_int.groupBy('topic') \
                .count() \
                .select('topic', 
                        col('count').alias('counts'))

agg_sum_e = df_e_int.groupBy('topic') \
                    .sum('sentiment_int') \
                    .select('topic', 
                            col('sum(sentiment_int)').alias('sum_value'))

agg_joined_e = agg_e.join(agg_sum_e, 
                          agg_e.topic == agg_sum_e.topic, 
                          'inner') \
                    .select(agg_e.topic, 'counts', 'sum_value')

final_agg_e = agg_joined_e.withColumn('sentiment_ratio', 
                                      (col('sum_value')/col('counts'))) \
                          .select('topic', 'sentiment_ratio')

The final output would look like this:

 ----- ------------------- 
|topic|    sentiment_ratio|
 ----- ------------------- 
|Chair|-0.3333333333333333|
|Table| 0.5               |
 ----- ------------------- 

What's the most efficient way of doing this?

CodePudding user response:

You can condense your logic into two lines by using avg:

from pyspark.sql import functions as F

df_e.groupBy("topic") \
    .agg(F.avg(F.when(F.col("sentiment").eqNullSafe("positive"), 1).otherwise(-1))) \
    .show()
  • Related