I have this dataset with a column of array type. From this column, we need to create another column which will have list of unique elements and its counts.
Example [a,b,e,b]
results should be [[b,a,e],[2,1,1]]
. Data should be sorted by count. Even key value where value is the count will do. I created a udf
(please see below) for this purpose, but it is very slow so I need to do this in PySpark built-in functions.
id | col_a | collected_col_a |
---|---|---|
1 | a | [a, b, e, b] |
1 | b | [a, b, e, b] |
struct_schema1 = StructType([
StructField('elements', ArrayType(StringType()), nullable=True),
StructField('count', ArrayType(IntegerType()), nullable=True)
])
# udf
@udf(returnType=struct_schema1)
def func1(x, top = 10):
y,z=np.unique(x,return_counts=True)
z_y = zip(z.tolist(), y.tolist())
y = [i for _, i in sorted(z_y, reverse = True)]
z = sorted(z.tolist(), reverse = True)
if len(y) > top:
return {'elements': y[:top],'count': z[:top]}
else:
return {'elements': y,'count': z}
CodePudding user response:
You can use combination of transform
and filter
functions along with array_distinct
and size
to get the desired output. Here's and example:
from pyspark.sql import functions as F
# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])
df1 = df.withColumn(
"uniqueCount",
F.transform(
F.array_distinct("arrayCol"),
lambda x: F.struct(
x.alias("value"),
F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
)
)
)
df1.show(truncate=False)
# --- ------------ ------------------------
#|id |arrayCol |uniqueCount |
# --- ------------ ------------------------
#|1 |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2 |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
# --- ------------ ------------------------
CodePudding user response:
An approach creating a map. Using aggregate
and map_zip_with
. The other approach seems clearer though.
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'a', ['a', 'b', 'e', 'b']),
(1, 'b', ['a', 'b', 'e', 'b'])],
['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
F.aggregate(
'collected_col_a',
F.lit(None).cast('map<string,int>'),
lambda m, x: F.map_zip_with(
F.coalesce(m, F.create_map(x, F.lit(0))),
F.create_map(x, F.lit(1)),
lambda k, v1, v2: F.coalesce(v1, F.lit(0)) F.coalesce(v2, F.lit(0))
)
)
)
df.show(truncate=0)
# --- ----- --------------- ------------------------
# |id |col_a|collected_col_a|elem_count |
# --- ----- --------------- ------------------------
# |1 |a |[a, b, e, b] |{a -> 1, b -> 2, e -> 1}|
# |1 |b |[a, b, e, b] |{a -> 1, b -> 2, e -> 1}|
# --- ----- --------------- ------------------------
Sorry, I couldn't figure out how to sort based on map values.