Home > Blockchain >  How to union many dataframes based on column order with casting to timestamp
How to union many dataframes based on column order with casting to timestamp

Time:06-10

I have many dataframes whose columns have the same order (the column name may differ for each dataframe). And there are 2 columns with a timestamp type but the problem is that in some dataframes, it has a date type. So I cannot merge it with union function.

I want to union all these dataframe but I don't want to cast to_timestamp for each dataframe.

My approach is to change the type of the first dataframe, then the remaining dataframe will follow the type of the first one but it does not work.

from pyspark.sql import functions as F

def change_type_timestamp(df):
    df = df.withColumn("A", F.to_timestamp(F.col("A"))) \
        .withColumn("B", F.to_timestamp(F.col("B")))
    return df

dfs = [df1, df2, df3, ...]

dfs[0] = change_type_timestamp(dfs[0])
reduce(lambda a, b: a.union(b), dfs)

How can I union all the dataframe without changing the type of each dataframe one-by-one?

CodePudding user response:

You can find column index by providing first dataframe's column names. And then reference columns in other dataframes based on the index.

Setup:

from pyspark.sql import functions as F
from functools import reduce

df1 = spark.createDataFrame([('2020-02-02 08:08:08', 1, '2020-02-02 07:07:07')], ['A', 'x', 'B'])
df1.printSchema()
# root
#  |-- A: string (nullable = true)
#  |-- x: long (nullable = true)
#  |-- B: string (nullable = true)

df2 = spark.createDataFrame([('2020-02-02 08:08:08', 1, '2020-02-02 07:07:07')], ['C', 'x', 'D'])
df2.printSchema()
# root
#  |-- C: string (nullable = true)
#  |-- x: long (nullable = true)
#  |-- D: string (nullable = true)

Script:

dfs = [df1, df2]
cols = ['A', 'B']
indexes = [dfs[0].columns.index(c) for c in cols]
for i, df in enumerate(dfs):
    for j in indexes:
        c = dfs[i].columns[j]
        dfs[i] = dfs[i].withColumn(c, F.to_timestamp(c))

df = reduce(lambda a, b: a.union(b), dfs)
df.printSchema()
# root
#  |-- A: timestamp (nullable = true)
#  |-- x: long (nullable = true)
#  |-- B: timestamp (nullable = true)

CodePudding user response:

Simpler, neater and readable way would be as follows. You may have to convert the timestamps and dates to strings and back. datetime always takes precedence in such a situations.

cols=df1.columns
dfs =[df1, df2]
df_list =[]
for df in dfs:
  df_list.append(df.toDF(*cols))
  
newdf=reduce(lambda a, b: b.unionByName(a), df_list)

newdf.printSchema()
  • Related