I have many dataframes whose columns have the same order (the column name may differ for each dataframe). And there are 2 columns with a timestamp
type but the problem is that in some dataframes, it has a date
type. So I cannot merge it with union
function.
I want to union
all these dataframe but I don't want to cast to_timestamp
for each dataframe.
My approach is to change the type of the first dataframe, then the remaining dataframe will follow the type of the first one but it does not work.
from pyspark.sql import functions as F
def change_type_timestamp(df):
df = df.withColumn("A", F.to_timestamp(F.col("A"))) \
.withColumn("B", F.to_timestamp(F.col("B")))
return df
dfs = [df1, df2, df3, ...]
dfs[0] = change_type_timestamp(dfs[0])
reduce(lambda a, b: a.union(b), dfs)
How can I union
all the dataframe without changing the type of each dataframe one-by-one?
CodePudding user response:
You can find column index by providing first dataframe's column names. And then reference columns in other dataframes based on the index.
Setup:
from pyspark.sql import functions as F
from functools import reduce
df1 = spark.createDataFrame([('2020-02-02 08:08:08', 1, '2020-02-02 07:07:07')], ['A', 'x', 'B'])
df1.printSchema()
# root
# |-- A: string (nullable = true)
# |-- x: long (nullable = true)
# |-- B: string (nullable = true)
df2 = spark.createDataFrame([('2020-02-02 08:08:08', 1, '2020-02-02 07:07:07')], ['C', 'x', 'D'])
df2.printSchema()
# root
# |-- C: string (nullable = true)
# |-- x: long (nullable = true)
# |-- D: string (nullable = true)
Script:
dfs = [df1, df2]
cols = ['A', 'B']
indexes = [dfs[0].columns.index(c) for c in cols]
for i, df in enumerate(dfs):
for j in indexes:
c = dfs[i].columns[j]
dfs[i] = dfs[i].withColumn(c, F.to_timestamp(c))
df = reduce(lambda a, b: a.union(b), dfs)
df.printSchema()
# root
# |-- A: timestamp (nullable = true)
# |-- x: long (nullable = true)
# |-- B: timestamp (nullable = true)
CodePudding user response:
Simpler, neater and readable way would be as follows. You may have to convert the timestamps and dates to strings and back. datetime always takes precedence in such a situations.
cols=df1.columns
dfs =[df1, df2]
df_list =[]
for df in dfs:
df_list.append(df.toDF(*cols))
newdf=reduce(lambda a, b: b.unionByName(a), df_list)
newdf.printSchema()