I have a dataset which looks somewhat like this:
idx | attributes
--------------------------
101 | ['a','b','c']
102 | ['a','b','d']
103 | ['b','c']
104 | ['c','e','f']
105 | ['a','b','c']
106 | ['c','g','h']
107 | ['b','d']
108 | ['d','g','i']
I wish to transform the above dataframe into something like this:
idx | attributes
--------------------------
101 | [0,1,2]
102 | [0,1,3]
103 | [1,2]
104 | [2,4,5]
105 | [0,1,2]
106 | [2,6,7]
107 | [1,3]
108 | [3,6,8]
Here, 'a' is replaced by 0, 'b' is replaced by 1 and so. Essentially, I wish to find all unique elements and assign them numbers so that integer operations can be made on them. My current approach is by using RDDs to maintain a single set and loop across rows but it's highly memory and time-intensive. Is there any other method for this in PySpark?
Thanks in advance
CodePudding user response:
Annotated code
from pyspark.ml.feature import StringIndexer
# Explode the dataframe by `attributes`
df1 = df.selectExpr('idx', "explode(attributes) as attributes")
# Create a StringIndexer to encode the labels
idx = StringIndexer(inputCol='attributes', outputCol='encoded', stringOrderType='alphabetAsc')
df1 = idx.fit(df1).transform(df1)
# group the encoded column by idx and aggregate using `collect_list`
df1 = df1.groupBy('idx').agg(F.collect_list(F.col('encoded').cast('int')).alias('attributes'))
Result
df1.show()
--- ----------
|idx|attributes|
--- ----------
|101| [0, 1, 2]|
|102| [0, 1, 3]|
|103| [1, 2]|
|104| [2, 4, 5]|
|105| [0, 1, 2]|
|106| [2, 6, 7]|
|107| [1, 3]|
|108| [3, 6, 8]|
--- ----------
CodePudding user response:
This can be done in spark 2.4 as a one liner. In spark 3.0 this can be done without expr.
df = spark.createDataFrame(data=[(101,['a','b','c']),
(102,['a','b','d']),
(103,['b','c']),
(104,['c','e','f']),
(105,['a','b','c']),
(106,['c','g','h']),
(107,['b','d']),
(108,['d','g','i']),],schema = ["idx","attributes"])
df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).show()
--- ----------
|idx|attributes|
--- ----------
|101| [1, 2, 3]|
|102| [1, 2, 4]|
|103| [2, 3]|
|104| [3, 5, 6]|
|105| [1, 2, 3]|
|106| [3, 7, 8]|
|107| [2, 4]|
|108| [4, 7, 9]|
--- ----------
The tricky bit: expr("transform( attributes, x -> ascii(x)-96)")
expr
is used to say this is a SQL expressiontransform
takes a column [that is an array] and applies a function to each element in the array (x
is the lambda parameter for the element of the array.->
function start and)
function end.ascii(x)-96)
convert ascii code into integer.
If you are considering performance you may consider the explain plan for my answer vs the other one provided so far:
df1.groupBy('idx').agg(collect_list(col('encoded').cast('int')).alias('attributes')).explain()
== Physical Plan ==
ObjectHashAggregate(keys=[idx#24L], functions=[collect_list(cast(encoded#140 as int), 0, 0)])
- Exchange hashpartitioning(idx#24L, 200)
- ObjectHashAggregate(keys=[idx#24L], functions=[partial_collect_list(cast(encoded#140 as int), 0, 0)])
- *(1) Project [idx#24L, UDF(attributes#132) AS encoded#140]
- Generate explode(attributes#25), [idx#24L], false, [attributes#132]
- Scan ExistingRDD[idx#24L,attributes#25]
my answer:
df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).explain()
== Physical Plan ==
Project [idx#24L, transform(attributes#25, lambdafunction((ascii(lambda x#128) - 96), lambda x#128, false)) AS attributes#127]