Home > Software engineering >  Employing Pyspark How to determine the frequency of each event and its event-by-event frequency
Employing Pyspark How to determine the frequency of each event and its event-by-event frequency

Time:01-04

I have a dataset like:

Data

  • a
  • a
  • a
  • a
  • a
  • b
  • b
  • b
  • a
  • a
  • b

I would like to include a column that like the one below. The data will be in the form of a1,1 in the column, where the first element represents the event frequency (a1), or how often "a" appears in the field, and the second element (,1) is the frequency for each event, or how often "a" repeats before any other element (b) in the field. Can we carry this out with PySpark?

Data   Frequency
     a        a1,1
     a        a1,2    
     a        a1,3
     a        a1,4
     a        a1,5
     b        b1,1
     b        b1,2
     b        b1,3
     a        a2,1
     a        a2,2
     b        b2,1

CodePudding user response:

You can achieve your desired result by doing this,

from pyspark.sql import Window
import pyspark.sql.functions as F

df = spark.createDataFrame(['a', 'a', 'a', 'a', 'b', 'b', 'b', 'a', 'a', 'a'], 'string').toDF("Data")

print("Original Data:")
df.show()

print("Result:")
df.withColumn("ID", F.monotonically_increasing_id()) \
    .withColumn("group",
            F.row_number().over(Window.orderBy("ID"))
            - F.row_number().over(Window.partitionBy("Data").orderBy("ID"))
    ) \
    .withColumn("element_freq", F.when(F.col('Data') != 'abcd', F.row_number().over(Window.partitionBy("group").orderBy("ID"))).otherwise(F.lit(0)))\
    .withColumn("event_freq", F.when(F.col('Data') != 'abcd', F.dense_rank().over(Window.partitionBy("Data").orderBy("group"))).otherwise(F.lit(0)))\
    .withColumn("Frequency", F.concat_ws(',', F.concat(F.col("Data"), F.col("event_freq")), F.col("element_freq"))) \
    .orderBy("ID")\
    .drop("ID", "group", "event_freq", "element_freq")\
    .show()
Original Data:
 ---- 
|Data|
 ---- 
|   a|
|   a|
|   a|
|   a|
|   a|
|   b|
|   b|
|   b|
|   a|
|   a|
|   b|
 ---- 

Result:
 ---- --------- 
|Data|Frequency|
 ---- --------- 
|   a|     a1,1|
|   a|     a1,2|
|   a|     a1,3|
|   a|     a1,4|
|   a|     a1,5|
|   b|     b1,1|
|   b|     b1,2|
|   b|     b1,3|
|   a|     a2,1|
|   a|     a2,2|
|   b|     b2,1|
 ---- --------- 

CodePudding user response:

Use Window functions. I give you to options just in case

Option 1, separating groups and Frequency

        #Variable to use in the groupby
k=Window.partitionBy().orderBy('index')

(
  
  #Create an index of df to order by
  df1.withColumn('index', monotonically_increasing_id())
  
  #Create a column that puts a consecutive and previous Data in a row   
  .withColumn('group', lag('Data').over(k))
 
#  #Where consecutive and previous dont match, conditionally assign a 1 else o
 .withColumn('group', when(col('data')!=col('group'),1).otherwise(0))
  
   # Concat Data and sum of outcome from above per group and ordered by index  
  .withColumn('group', concat('Data',sum('group').over(Window.partitionBy('Data').orderBy('index')) 1))
  
  #rank outcome above in the order in which they appeared in initial df
.withColumn('Frequency', rank().over(Window.partitionBy('group').orderBy('index')))

).sort('index').drop('index').show(truncate=False)


 ---- ----- --------- 
|Data|group|Frequency|
 ---- ----- --------- 
|a   |a1   |1        |
|a   |a1   |2        |
|a   |a1   |3        |
|a   |a1   |4        |
|a   |a1   |5        |
|b   |b2   |1        |
|b   |b2   |2        |
|b   |b2   |3        |
|a   |a2   |1        |
|a   |a2   |2        |
|b   |b3   |1        |
 ---- ----- --------- 

Option 2 gives an output you wanted

#Variable to use in the groupby
k=Window.partitionBy().orderBy('index')

(
  
  #Create an index of df to order by
  df1.withColumn('index', monotonically_increasing_id())
  
  #Create a column that puts a consecutive and previous Data in a row   
  .withColumn('Frequency', lag('Data').over(k))
 
#  #Where consecutive and previous dont match, conditionally assign a 1 else o
 .withColumn('Frequency', when(col('data')!=col('Frequency'),1).otherwise(0))
  
   # Concat Data and sum of outcome from above per group and ordered by index  
  .withColumn('Frequency', concat('Data',sum('Frequency').over(Window.partitionBy('Data').orderBy('index')) 1))
  
  #rank outcome above in the order in which they appeared in initial df
.withColumn('Frequency', array_join(array('Frequency',rank().over(Window.partitionBy('Frequency').orderBy('index'))),','))

).sort('index').drop('index').show(truncate=False)


 ---- --------- 
|Data|Frequency|
 ---- --------- 
|a   |a1,1     |
|a   |a1,2     |
|a   |a1,3     |
|a   |a1,4     |
|a   |a1,5     |
|b   |b2,1     |
|b   |b2,2     |
|b   |b2,3     |
|a   |a2,1     |
|a   |a2,2     |
|b   |b3,1     |
 ---- --------- 
  • Related