Home > Mobile >  Count words from a list within array columns without invoking a shuffle
Count words from a list within array columns without invoking a shuffle

Time:06-09

I'm coming from this post: pyspark: count number of occurrences of distinct elements in lists where the OP asked about getting the counts for distinct items from array columns. What if I already know the vocabulary in advance and want to get a vector of a preset length computed?

So let's say my vocabulary is

vocab = ['A', 'B', 'C', 'D', 'E']

and my data looks like this (altered from the other post)

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
     'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}

data['date'] = pd.to_datetime(data['date'])

data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")

df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))

and ultimately this is what I want:

 ------------------- ----------- --------------------- 
|               date| flat      | counts              |
 ------------------- ----------- --------------------- 
|2014-01-01 00:00:00|A;A;B      |[2, 1, 0, 0, 0]      |
|2014-01-02 00:00:00|D;B;E;B;B  |[0, 3, 0, 1, 1]      |
|2014-01-03 00:00:00|B;A        |[1, 1, 0, 0, 0]      |
 ------------------- ----------- --------------------- 

Here is a working solution that seems inefficient, adapted from the solution to the prior post:

from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("distinct_items", F.array_distinct("list") \
  .withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc IF(t=x,1,0))))"""))\
  .withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
  .withColumn(
      "counts",
      F.array(
          [
              F.when(
                  F.col("count_map")
                      .getItem(v)
                      .isNull(),
                      0,
                  )
                  .otherwise(
                      F.col("count_map").getItem(v)
                  )
              for v in vocab
           ]
      ).drop("occurrences", "distinct_items").show()

Can I do this without having to create a map and then create arrays from the map? I need to do this procedure in practice on a large table with a large number of columns, so I would like to avoid having to do groupBy, agg type operations.

CodePudding user response:

Very nice question. Your intuition is entirely correct: shuffle can be avoided in this case.

However, I'm not sure I could explain the logic. This is the first time I have used nested transform. It's possible that there's a smart other way...

from pyspark.sql import functions as F
vocab = ['A', 'B', 'C', 'D', 'E']
df = spark.createDataFrame([('A;A;B',), ('D;B;E;B;B',), ('B;A',),], ['flat'])

voc_arr = F.array([F.lit(x) for x in vocab])
df = df.withColumn('count', F.transform(voc_arr, lambda v: F.size(F.array_remove(F.transform(F.split('flat', ';'), lambda f: f == v), False))))

df.show()
#  --------- --------------- 
# |     flat|          count|
#  --------- --------------- 
# |    A;A;B|[2, 1, 0, 0, 0]|
# |D;B;E;B;B|[0, 3, 0, 1, 1]|
# |      B;A|[1, 1, 0, 0, 0]|
#  --------- --------------- 

CodePudding user response:

very interesting question.
While it can work without shuffles with higher order functions I couldn't figure a complexity that is lower than VocabularySize * flatSize.
Still better than shuffles I guess.

vocabulary = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P",
              "Q", "R", "S", "U", "V", "W", "X", "Y", "Z"]
vocabulary_df = spark.createDataFrame(
    [
     [{k:0 for k in vocabulary}]
    ],
     ["vocab"]
)

df \
.crossJoin(vocabulary_df) \
.withColumn("count_distinct", aggregate(
    "flat",
    initValue="vocab",
    merge=lambda acc, flat_value: transform_values(
        acc,
        lambda vocab_key, vocab_value: when(
            flat_value == vocab_key,
            vocab_value   1
        ).otherwise(vocab_value)
    )
)) \
.select("flat", "count_distinct") \
.show(truncate=0)

 --------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|flat           |count_distinct                                                                                                                                                                                          |
 --------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|[A, A, B]      |{A -> 2, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[D, B, E, B, B]|{A -> 0, B -> 3, C -> 0, D -> 1, E -> 1, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[B, A]         |{A -> 1, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
 --------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 

CodePudding user response:

I have another variation of the first method, putting the entire vocab into the front of each array... not sure what the relative merits might be

from pyspark.sql import functions as F

vocab_arr = F.array([F.lit(v) for v in vocab])
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("list_", F.concat(vocab_arr, "list")) \
  .withColumn(
     "counts", 
     F.expr("""transform(list_, x-> aggregate(list_, -1,(acc,t)->acc IF(t=x,1,0)))""")) \
  .withColumn("counts", F.slice("counts", 1, len(vocab))) \
  .drop("list_").show()
  • Related