Home > Net >  PySpark DF Cannot do any action
PySpark DF Cannot do any action

Time:05-13

It has been 3 weeks since this problem started. Long story short, my goal is to create a user-item matrix for recommendation (cosine, sad, ...).

For this I've created the code below. This first function list all the file in my HDFS so I can read all my order, my product views and my product added to cart at the same time.

These actions work fine and I can print my dataframes without any problems. When I start to do actions like .distinct on the unions of my 3 DFs, I can't do anything on my df, I can show, collect, toPandas without having a big error that I can't understand.

Error about reading parquet, but just after the reading I can read my df right so there is no issues with the reading right ?

I already tried, the write legacy format, and a lot of other stuff.

Can some of you help me ? (Feel free to ask any questions, or to request more codes/files)

Thank you ! Adrien

def get_data_raw_to_df(spark, nb_months=36, id_shop="-1"):
    '''get Data'''

    filename_list_paid = get_date_list(
        spark,
        "hdfs:///"   id_shop   "/master/order",
        id_shop,
        nb_months,
        "paid_"
        )

    filename_list_cart = get_date_list(
        spark,
        "hdfs:///"   id_shop   "/master/product",
        id_shop,
        nb_months,
        "cart_"
        )

    filename_list_view = get_date_list(
        spark,
        "hdfs:///"   id_shop   "/master/product",
        id_shop,
        nb_months,
        "view_"
        )

    empty_rdd = spark.sparkContext.emptyRDD()

    schema = StructType([
        StructField('cid', StringType(), True),
        StructField('sid', StringType(), True),
        StructField('product_id', StringType(), True),
        StructField('buy_product', IntegerType(), True),
        StructField('cart_product', IntegerType(), True),
        StructField('view_product', IntegerType(), True),
    ])

    df_users_items = empty_rdd.toDF(schema)

    if len(filename_list_paid) > 0:
        df_tmp_buy_product = spark.read.parquet(*filename_list_paid)\
                                .filter("is_bot == 'false'")\
                                .select('cid',
                                        'sid',
                                        F.explode('items'))\
                                .select('cid', 'sid',
                                        col("col.product_id"))\
                                .withColumn("buy_product",
                                            F.lit(5))
    if len(filename_list_cart) > 0:
        df_tmp_cart_product = spark.read.parquet(*filename_list_cart)\
                                .filter("is_bot == 'false'")\
                                .select('cid', 'sid',
                                        'product_id')\
                                .withColumn("cart_product",
                                            F.lit(2))
    if len(filename_list_view) > 0:
        df_tmp_view_product = spark.read.parquet(*filename_list_view)\
                                .filter("is_bot == 'false'")\
                                .select('cid', 'sid',
                                        'product_id')\
                                .withColumn("view_product",
                                            F.lit(1))

    try:
        # Setup product buy
        df_users_items = df_users_items\
            .unionByName(df_tmp_buy_product
                         .select('cid',
                                 'product_id',
                                 'sid',
                                 'buy_product'),
                         allowMissingColumns=True)
        # Can do thing form here
        df_tmp = (df_tmp_buy_product.select('cid', 'sid')
                  .unionByName(
                        df_tmp_cart_product.select('cid', 'sid'))
                  .unionByName(
                        df_tmp_view_product.select('cid', 'sid'))).distinct()
# Errors from here
...

Error log : https://justpaste.it/2y7jc

CodePudding user response:

spark.read.option('mergeSchema', True).parquet(*filename_list_paid) might help. Or you could try reading this specific file.

Use spark.read.parquet('.../order/paid__20220324.parquet/part-00039-ff535f97*').printSchema() to see if you have String or other types.

  • Related