Home > Blockchain >  Execute same function on different columns to make rows to append another table
Execute same function on different columns to make rows to append another table

Time:06-11

How could I perform the same operation for 15 columns on a DataFrame?
How could I parallelize the operation?

I have an input data that I need to update a reference table. There are more columns but I think these 3 help to understand what I am trying to do.

Table: input

rowid col1 col2 col3
id1 col1_data1 col2_data1 col3_data1
id2 col1_data2 col2_data2 col3_data2

The reference table contains the values of each corresponding cell of the column, then the md5 and finally the column name

Table: references

col_data md5 ref_name
col1_data1 md5_col1_data1 col1_name
col1_data2 md5_col1_data2 col1_name
col1_data3 md5_col1_data3 col1_name
col2_data1 md5_col2_data1 col2_name
col2_data2 md5_col2_data2 col2_name
col2_data3 md5_col2_data3 col2_name
col3_data1 md5_col3_data1 col3_name
col3_data2 md5_col3_data2 col3_name
col3_data3 md5_col3_data3 col3_name

I created a function similar to this that checks the input table against the reference table and when new data is found then the reference is created and a dataframe is returned so that at the end the references table is updated

def repeatedly_excuted_funcion(input_data, references, col_name):
    """ 
    input_data is the full dataframe
    references is the table to check if has the value and if not create it
    col_name is the name of the column that will be considered on the execution
    """
    # ... some code ... 
    return partial_df

df_col1 = repeatedly_excuted_funcion(input_data, references, "col1")    
df_col2 = repeatedly_excuted_funcion(input_data, references, "col2")    
data_to_append = df_col1.union(df_col2)
df_col3 = repeatedly_excuted_funcion(input_data, references, "col3")
data_to_append = data_to_append.union(df_col2) 

I only put a 3 column example but there are 15 columns to check.

At the end the idea is to update the references table with the new calculated md5 values.

(
     data_to_append.write.format("delta")
     .mode("append")
     .saveAsTable(database_table)
)

CodePudding user response:

Create an empty DF with the correct schema. Get All the columns, Union this to all the rows. I'm not sure for 15 itesm it's worth parallelizing, or you wouldn't run into issues with spark context (as it's not availble inside an executor). Meaning you would have to have pure python code inside of repeatedly_excuted_function. You might be able to do all rows at once with a UDF, but I'm not sure if that would perform as well. (UDFs are known for poor performance due to the lack of vectorization).

from pyspark.sql.types import StructType,StructField, StringType

unionSchema = StructType([       
    StructField('column', StringType(), True)])
 my_union =  spark.createDataFrame( data = [] , 
 schema = unionSchema )
 for i in myDF.columns:
  my_union = my_union.union(repeatedly_excuted_funcion(input_data, references, i)

CodePudding user response:

what about pivoting the data and performing one join?
The code below creates map, the input is a little annoying as I create in python a list of [lit(column_name1), col(column_name1), lit(column_name2), ...] , the main purpose of this map is to explode it and then the first table is in a similar format as the reference df and one normal join can be performed.

from itertools import chain
from pyspark.sql.functions import create_map, array, lit, col, explode

column_names = ["col1", "col2", "col3"]


df \
.withColumn("features_map", create_map(
    list(chain(*[(lit(c), col(c)) for c in column_names]))
)) \
.select("rowid", explode("features_map").alias("ref_name", "col_data")) \
.join(ref_df, on=["ref_name", "col_data"], how="left") ....

CodePudding user response:

No function, no unions. 1 shuffle (anti join).

  • Create all the 3 final columns (data, md5, col_name) inside the array in Input table
  • Unpivot - from every 1 row of 15 cols make 1 col of 15 rows
  • Split the 1 array col into 3 data cols
  • Filter out rows which already exist in References
  • Append result
from pyspark.sql import functions as F

cols = ['col1', 'col2',..., 'col15']

# Change Input columns to arrays
df_input = df_input.select(
    *[F.array(F.col(c), F.md5(c), F.lit(c)).alias(c) for c in cols]
)
# Unpivot Input table
stack_string = ", ".join([f"`{c}`" for c in cols])
df_input2 = df_input.select(
    F.expr(f"stack({len(cols)}, {stack_string}) as col_data"))

# Make 3 columns from 1 array column
df_input3 = df_input2.select(
    F.element_at('col_data', 1).alias('col_data'),
    F.element_at('col_data', 2).alias('md5'),
    F.element_at('col_data', 3).alias('ref_name'),
)

# Keep only rows which don't exist in References table
data_to_append = df_input3.join(df_references, 'col_data', 'anti')

(
    data_to_append.write.format("delta")
    .mode("append")
    .saveAsTable(database_table)
)
  • Related