I want to get a ratio of sentiment, and for that I need to calculate how many positives and how many negatives there are per topic, and then divide it by the total of records of each topic.
Let's say I have this dataset:
----- ---------
|topic|sentiment|
----- ---------
|Chair| positive|
|Table| negative|
|Chair| negative|
|Chair| negative|
|Table| positive|
|Table| positive|
|Table| positive|
----- ---------
In this case, I could give a value of -1 to 'negative' and 1 to 'positive', then this ratio would be 0.5
in the case of Table (negative positive positive positive) / total_count)
, and -0.33
in the case of Chair: (positive negative negative) / total_count)
.
I have come up with this solution but seems way too complicated:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, when
spark = SparkSession.builder.appName('SparkExample').getOrCreate()
data_e = [("Chair","positive"),
("Table","negative"),
("Chair","negative"),
("Chair","negative"),
("Table","positive"),
("Table","positive")
]
schema_e = StructType([ \
StructField("topic",StringType(),True), \
StructField("sentiment",StringType(),True), \
])
df_e = spark.createDataFrame(data=data_e,schema=schema_e)
df_e_int = df_e.withColumn('sentiment_int',
when(col('sentiment') == 'positive', 1) \
.otherwise(-1)) \
.select('topic', 'sentiment_int')
agg_e = df_e_int.groupBy('topic') \
.count() \
.select('topic',
col('count').alias('counts'))
agg_sum_e = df_e_int.groupBy('topic') \
.sum('sentiment_int') \
.select('topic',
col('sum(sentiment_int)').alias('sum_value'))
agg_joined_e = agg_e.join(agg_sum_e,
agg_e.topic == agg_sum_e.topic,
'inner') \
.select(agg_e.topic, 'counts', 'sum_value')
final_agg_e = agg_joined_e.withColumn('sentiment_ratio',
(col('sum_value')/col('counts'))) \
.select('topic', 'sentiment_ratio')
The final output would look like this:
----- -------------------
|topic| sentiment_ratio|
----- -------------------
|Chair|-0.3333333333333333|
|Table| 0.5 |
----- -------------------
What's the most efficient way of doing this?
CodePudding user response:
You can condense your logic into two lines by using avg:
from pyspark.sql import functions as F
df_e.groupBy("topic") \
.agg(F.avg(F.when(F.col("sentiment").eqNullSafe("positive"), 1).otherwise(-1))) \
.show()