It has been 3 weeks since this problem started. Long story short, my goal is to create a user-item matrix for recommendation (cosine, sad, ...).
For this I've created the code below. This first function list all the file in my HDFS so I can read all my order, my product views and my product added to cart at the same time.
These actions work fine and I can print my dataframes without any problems. When I start to do actions like .distinct on the unions of my 3 DFs, I can't do anything on my df, I can show, collect, toPandas without having a big error that I can't understand.
Error about reading parquet, but just after the reading I can read my df right so there is no issues with the reading right ?
I already tried, the write legacy format, and a lot of other stuff.
Can some of you help me ? (Feel free to ask any questions, or to request more codes/files)
Thank you ! Adrien
def get_data_raw_to_df(spark, nb_months=36, id_shop="-1"):
'''get Data'''
filename_list_paid = get_date_list(
spark,
"hdfs:///" id_shop "/master/order",
id_shop,
nb_months,
"paid_"
)
filename_list_cart = get_date_list(
spark,
"hdfs:///" id_shop "/master/product",
id_shop,
nb_months,
"cart_"
)
filename_list_view = get_date_list(
spark,
"hdfs:///" id_shop "/master/product",
id_shop,
nb_months,
"view_"
)
empty_rdd = spark.sparkContext.emptyRDD()
schema = StructType([
StructField('cid', StringType(), True),
StructField('sid', StringType(), True),
StructField('product_id', StringType(), True),
StructField('buy_product', IntegerType(), True),
StructField('cart_product', IntegerType(), True),
StructField('view_product', IntegerType(), True),
])
df_users_items = empty_rdd.toDF(schema)
if len(filename_list_paid) > 0:
df_tmp_buy_product = spark.read.parquet(*filename_list_paid)\
.filter("is_bot == 'false'")\
.select('cid',
'sid',
F.explode('items'))\
.select('cid', 'sid',
col("col.product_id"))\
.withColumn("buy_product",
F.lit(5))
if len(filename_list_cart) > 0:
df_tmp_cart_product = spark.read.parquet(*filename_list_cart)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("cart_product",
F.lit(2))
if len(filename_list_view) > 0:
df_tmp_view_product = spark.read.parquet(*filename_list_view)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("view_product",
F.lit(1))
try:
# Setup product buy
df_users_items = df_users_items\
.unionByName(df_tmp_buy_product
.select('cid',
'product_id',
'sid',
'buy_product'),
allowMissingColumns=True)
# Can do thing form here
df_tmp = (df_tmp_buy_product.select('cid', 'sid')
.unionByName(
df_tmp_cart_product.select('cid', 'sid'))
.unionByName(
df_tmp_view_product.select('cid', 'sid'))).distinct()
# Errors from here
...
Error log : https://justpaste.it/2y7jc
CodePudding user response:
spark.read.option('mergeSchema', True).parquet(*filename_list_paid)
might help. Or you could try reading this specific file.
Use spark.read.parquet('.../order/paid__20220324.parquet/part-00039-ff535f97*').printSchema()
to see if you have String
or other types.