Home > Mobile >  Get unique elements for every array-based row
Get unique elements for every array-based row

Time:06-16

I have a dataset which looks somewhat like this:

idx   |     attributes
--------------------------     
101   |     ['a','b','c']
102   |     ['a','b','d']
103   |     ['b','c']
104   |     ['c','e','f']
105   |     ['a','b','c']
106   |     ['c','g','h']
107   |     ['b','d']
108   |     ['d','g','i']

I wish to transform the above dataframe into something like this:

idx   |     attributes
--------------------------
101   |     [0,1,2]
102   |     [0,1,3]
103   |     [1,2]
104   |     [2,4,5]
105   |     [0,1,2]
106   |     [2,6,7]
107   |     [1,3]
108   |     [3,6,8]

Here, 'a' is replaced by 0, 'b' is replaced by 1 and so. Essentially, I wish to find all unique elements and assign them numbers so that integer operations can be made on them. My current approach is by using RDDs to maintain a single set and loop across rows but it's highly memory and time-intensive. Is there any other method for this in PySpark?

Thanks in advance

CodePudding user response:

Annotated code

from pyspark.ml.feature import StringIndexer

# Explode the dataframe by `attributes`
df1 = df.selectExpr('idx', "explode(attributes) as attributes")

# Create a StringIndexer to encode the labels
idx = StringIndexer(inputCol='attributes', outputCol='encoded', stringOrderType='alphabetAsc') 
df1 = idx.fit(df1).transform(df1)

# group the encoded column by idx and aggregate using `collect_list`
df1 = df1.groupBy('idx').agg(F.collect_list(F.col('encoded').cast('int')).alias('attributes'))

Result

df1.show()

 --- ---------- 
|idx|attributes|
 --- ---------- 
|101| [0, 1, 2]|
|102| [0, 1, 3]|
|103|    [1, 2]|
|104| [2, 4, 5]|
|105| [0, 1, 2]|
|106| [2, 6, 7]|
|107|    [1, 3]|
|108| [3, 6, 8]|
 --- ---------- 

CodePudding user response:

This can be done in spark 2.4 as a one liner. In spark 3.0 this can be done without expr.

df = spark.createDataFrame(data=[(101,['a','b','c']),
(102,['a','b','d']),
(103,['b','c']),
(104,['c','e','f']),
(105,['a','b','c']),
(106,['c','g','h']),
(107,['b','d']),
(108,['d','g','i']),],schema = ["idx","attributes"])

df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).show()
 --- ---------- 
|idx|attributes|
 --- ---------- 
|101| [1, 2, 3]|
|102| [1, 2, 4]|
|103|    [2, 3]|
|104| [3, 5, 6]|
|105| [1, 2, 3]|
|106| [3, 7, 8]|
|107|    [2, 4]|
|108| [4, 7, 9]|
 --- ---------- 

The tricky bit: expr("transform( attributes, x -> ascii(x)-96)")

  • expr is used to say this is a SQL expression
  • transform takes a column [that is an array] and applies a function to each element in the array ( x is the lambda parameter for the element of the array. -> function start and ) function end.
  • ascii(x)-96) convert ascii code into integer.

If you are considering performance you may consider the explain plan for my answer vs the other one provided so far:

df1.groupBy('idx').agg(collect_list(col('encoded').cast('int')).alias('attributes')).explain()
== Physical Plan ==
ObjectHashAggregate(keys=[idx#24L], functions=[collect_list(cast(encoded#140 as int), 0, 0)])
 - Exchange hashpartitioning(idx#24L, 200)
    - ObjectHashAggregate(keys=[idx#24L], functions=[partial_collect_list(cast(encoded#140 as int), 0, 0)])
       - *(1) Project [idx#24L, UDF(attributes#132) AS encoded#140]
          - Generate explode(attributes#25), [idx#24L], false, [attributes#132]
             - Scan ExistingRDD[idx#24L,attributes#25]

my answer:

df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).explain()
== Physical Plan ==
Project [idx#24L, transform(attributes#25, lambdafunction((ascii(lambda x#128) - 96), lambda x#128, false)) AS attributes#127]
  • Related