Home > Net >  Unique element count in array column
Unique element count in array column

Time:06-17

I have this dataset with a column of array type. From this column, we need to create another column which will have list of unique elements and its counts.

Example [a,b,e,b] results should be [[b,a,e],[2,1,1]]. Data should be sorted by count. Even key value where value is the count will do. I created a udf (please see below) for this purpose, but it is very slow so I need to do this in PySpark built-in functions.

id col_a collected_col_a
1 a [a, b, e, b]
1 b [a, b, e, b]
struct_schema1 = StructType([
    StructField('elements', ArrayType(StringType()), nullable=True),
    StructField('count', ArrayType(IntegerType()), nullable=True)
])

# udf
@udf(returnType=struct_schema1)
def func1(x, top = 10):
    y,z=np.unique(x,return_counts=True)
    z_y = zip(z.tolist(), y.tolist())
    y = [i for _, i in sorted(z_y, reverse = True)]
    z = sorted(z.tolist(), reverse = True)
    if len(y) > top:
        return {'elements': y[:top],'count': z[:top]}
    else:
        return {'elements': y,'count': z}

CodePudding user response:

You can use combination of transform and filter functions along with array_distinct and size to get the desired output. Here's and example:

from pyspark.sql import functions as F

# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])


df1 = df.withColumn(
    "uniqueCount",
    F.transform(
        F.array_distinct("arrayCol"),
        lambda x: F.struct(
            x.alias("value"),
            F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
        )
    )
)
df1.show(truncate=False)
# --- ------------ ------------------------ 
#|id |arrayCol    |uniqueCount             |
# --- ------------ ------------------------ 
#|1  |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2  |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
# --- ------------ ------------------------ 

CodePudding user response:

An approach creating a map. Using aggregate and map_zip_with. The other approach seems clearer though.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', ['a', 'b', 'e', 'b']),
     (1, 'b', ['a', 'b', 'e', 'b'])],
    ['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
    F.aggregate(
        'collected_col_a',
        F.lit(None).cast('map<string,int>'),
        lambda m, x: F.map_zip_with(
            F.coalesce(m, F.create_map(x, F.lit(0))),
            F.create_map(x, F.lit(1)),
            lambda k, v1, v2: F.coalesce(v1, F.lit(0))   F.coalesce(v2, F.lit(0))
        )
    )
)
df.show(truncate=0)
#  --- ----- --------------- ------------------------ 
# |id |col_a|collected_col_a|elem_count              |
#  --- ----- --------------- ------------------------ 
# |1  |a    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# |1  |b    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
#  --- ----- --------------- ------------------------ 

Sorry, I couldn't figure out how to sort based on map values.

  • Related