Here is my simple code to write to parquet using pandas:
df.to_parquet(output_path)
This is what df
is supposed to look like:
id cosine_similarity sub_id_list
111111111 [[0.99999994, 0.5283972, 0.9180976, 0.3284426,... [1, 2, 3...]
222222222 [[1.0000004, 0.19264436, 0.40389958, 0.1159741... [7, 8, 9...]
333333333 [[1.0, 0.7529702, 0.6990572, 0.5842277, 0.8533... [12, 4, 2...]
444444444 [[0.9999998, 0.19949964, 0.3271717, 0.48022747... [1, 2]
Some more details on the columns:
cosine_similarity
: N X N cosine similarity matrixsub_id_list
: List of all of the Nsub_id
within aid
When I read the parquet files in my directory in pyspark, my cosine similarity matrix ends up splitting on a auto-generated index level:
----------------- -------------------- -------------------- ---------
|__index_level_0__| cosine_similarity| sub_id| id|
----------------- -------------------- -------------------- ---------
| 0|[[0.9999996423721...|[6600381071465, 6...|222222222|
| 1|[[1.0000001192092...|[5961816244391, 6...|111111111|
| 2|[[1.0000001192092...|[6908383199422, 6...|333333333|
| 0|[[0.9999997615814...|[5961899770023, 5...|111111111|
| 1|[[1.0, 0.30099773...|[6794124132542, 6...|333333333|
| 0| [[1.0]]| [4637467902007]|444444444|
| 1| [[1.0]]| [5961974317223]|111111111|
| 2|[[1.0, 0.23061707...|[6807162486974, 6...|333333333|
| 0|[[1.0, 0.56126642...|[5961866641575, 5...|111111111|
| 1|[[1.0000002384185...|[6819049832638, 6...|333333333|
| 0| [[1.0]]| [4480999391337]|222222222|
With the way it's split, I won't be able to leverage my cosine_sim
matrix.
CodePudding user response:
Using the following data as an example,
df.show()
----------------- ----------------- --------- ---
|__index_level_0__|cosine_similarity| sub_id| id|
----------------- ----------------- --------- ---
| 0| [[0.99, 0.98]]|[66, 615]|111|
| 1| [[0.3, 0.18]]|[12, 513]|222|
| 2| [[0.29, 0.38]]|[23, 122]|111|
| 0| [[0.99, 0.31]]|[45, 935]|111|
| 1| [[0.54]]| [67]|111|
| 0| [[0.53, 0.98]]|[78, 567]|222|
| 1| [[1.0]]| [89]|222|
| 2| [[0.74, 0.72]]|[90, 445]|333|
| 0| [[0.39, 0.32]]|[67, 334]|333|
| 1| [[0.91, 0.85]]|[12, 223]|333|
| 0| [[1.0]]| [34]|333|
----------------- ----------------- --------- ---
You can use spark.sql functions to get the desired result like the following.
from pyspark.sql import functions
df = (df.groupBy("id")
.agg(functions.flatten(
functions.flatten(
functions.collect_list("cosine_similarity"))).alias("cosine_similarity"),
functions.flatten(functions.collect_list("sub_id")).alias("sub_id")))
df.show()
--- ------------------------------------------ -------------------------------
|id |cosine_similarity |sub_id |
--- ------------------------------------------ -------------------------------
|333|[0.74, 0.72, 0.39, 0.32, 0.91, 0.85, 1.0] |[90, 445, 67, 334, 12, 223, 34]|
|222|[0.3, 0.18, 0.53, 0.98, 1.0] |[12, 513, 78, 567, 89] |
|111|[0.99, 0.98, 0.29, 0.38, 0.99, 0.31, 0.54]|[66, 615, 23, 122, 45, 935, 67]|
--- ------------------------------------------ -------------------------------
CodePudding user response:
It turns out setting index=False
in my to_parquet
function does the trick:
df.to_parquet(output_path, index=False)