I'm coming from this post: pyspark: count number of occurrences of distinct elements in lists where the OP asked about getting the counts for distinct items from array columns. What if I already know the vocabulary in advance and want to get a vector of a preset length computed?
So let's say my vocabulary is
vocab = ['A', 'B', 'C', 'D', 'E']
and my data looks like this (altered from the other post)
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}
data['date'] = pd.to_datetime(data['date'])
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")
df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))
and ultimately this is what I want:
------------------- ----------- ---------------------
| date| flat | counts |
------------------- ----------- ---------------------
|2014-01-01 00:00:00|A;A;B |[2, 1, 0, 0, 0] |
|2014-01-02 00:00:00|D;B;E;B;B |[0, 3, 0, 1, 1] |
|2014-01-03 00:00:00|B;A |[1, 1, 0, 0, 0] |
------------------- ----------- ---------------------
Here is a working solution that seems inefficient, adapted from the solution to the prior post:
from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("distinct_items", F.array_distinct("list") \
.withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc IF(t=x,1,0))))"""))\
.withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
.withColumn(
"counts",
F.array(
[
F.when(
F.col("count_map")
.getItem(v)
.isNull(),
0,
)
.otherwise(
F.col("count_map").getItem(v)
)
for v in vocab
]
).drop("occurrences", "distinct_items").show()
Can I do this without having to create a map and then create arrays from the map? I need to do this procedure in practice on a large table with a large number of columns, so I would like to avoid having to do groupBy
, agg
type operations.
CodePudding user response:
Very nice question. Your intuition is entirely correct: shuffle can be avoided in this case.
However, I'm not sure I could explain the logic. This is the first time I have used nested transform
. It's possible that there's a smart other way...
from pyspark.sql import functions as F
vocab = ['A', 'B', 'C', 'D', 'E']
df = spark.createDataFrame([('A;A;B',), ('D;B;E;B;B',), ('B;A',),], ['flat'])
voc_arr = F.array([F.lit(x) for x in vocab])
df = df.withColumn('count', F.transform(voc_arr, lambda v: F.size(F.array_remove(F.transform(F.split('flat', ';'), lambda f: f == v), False))))
df.show()
# --------- ---------------
# | flat| count|
# --------- ---------------
# | A;A;B|[2, 1, 0, 0, 0]|
# |D;B;E;B;B|[0, 3, 0, 1, 1]|
# | B;A|[1, 1, 0, 0, 0]|
# --------- ---------------
CodePudding user response:
very interesting question.
While it can work without shuffles with higher order functions I couldn't figure a complexity that is lower than VocabularySize * flatSize.
Still better than shuffles I guess.
vocabulary = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P",
"Q", "R", "S", "U", "V", "W", "X", "Y", "Z"]
vocabulary_df = spark.createDataFrame(
[
[{k:0 for k in vocabulary}]
],
["vocab"]
)
df \
.crossJoin(vocabulary_df) \
.withColumn("count_distinct", aggregate(
"flat",
initValue="vocab",
merge=lambda acc, flat_value: transform_values(
acc,
lambda vocab_key, vocab_value: when(
flat_value == vocab_key,
vocab_value 1
).otherwise(vocab_value)
)
)) \
.select("flat", "count_distinct") \
.show(truncate=0)
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|flat |count_distinct |
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|[A, A, B] |{A -> 2, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[D, B, E, B, B]|{A -> 0, B -> 3, C -> 0, D -> 1, E -> 1, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[B, A] |{A -> 1, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CodePudding user response:
I have another variation of the first method, putting the entire vocab into the front of each array... not sure what the relative merits might be
from pyspark.sql import functions as F
vocab_arr = F.array([F.lit(v) for v in vocab])
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("list_", F.concat(vocab_arr, "list")) \
.withColumn(
"counts",
F.expr("""transform(list_, x-> aggregate(list_, -1,(acc,t)->acc IF(t=x,1,0)))""")) \
.withColumn("counts", F.slice("counts", 1, len(vocab))) \
.drop("list_").show()