I've a final dataframe with this format:
- Product_ID: string
- Product_COD: string
- Product_NAM: string
- Product_VER: integer
- ProductLine_NAM: string
- Language_COD: string
- ProductType_NAM: string
- Load_DAT: integer
- LoadEnd_DAT:integer
- edmChange_DTT: timestamp
and I want to add a new row to that dataframe where the ID (Product_ID) is -1 and in the string columns insert 'Unknown' and in the remaining datatypes set to "null" for example:
I created this code:
id_column = "Product_ID"
df_lessOne = spark.createDataFrame(["-1"], "string").toDF(id_column) #create a new id_column row with -1
appended_df = finalDf.unionByName(df_lessOne, allowMissingColumns=True) #add the rest columns of dataframe with nulls
appended_df_filter = appended_df.filter("" id_column " = '-1'")
columns = [item[0] for item in appended_df_filter.dtypes if item[1].startswith('string')] #select only string columns
# replace string columns with "Unknown"
for c_na in columns:
appended_df_filter = (appended_df_filter
.filter("" id_column " = '-1'")
.withColumn(c_na, lit('Unknown'))
)
appended_df = appended_df.filter("" id_column " <> '-1'")
dfs = [appended_df, appended_df_filter]
#add final -1 row to the final dataframe
finalDf = reduce(DataFrame.unionAll, dfs)
display(finalDf)
but unfortunately, it's not working well.
I'm trying to create this dynamically because after I want to use it in other dataframes. I just need to change the id_column after.
Can anyone please help me in achieving this
Thank you!
CodePudding user response:
from pyspark.sql.types import *
from datetime import datetime
import pyspark.sql.functions as F
data2 = [
("xp3980","2103","Product_1",1,"PdLine_23","XX1","PNT_1",2,36636,datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S')),
("gi9387","2411","Product_2",1,"PdLine_44","YT89","PNT_6",2,35847,datetime.strptime('2021-07-21 7:00:00', '%Y-%m-%d %H:%M:%S'))
]
schema = StructType([ \
StructField("Product_ID",StringType(),True), \
StructField("Product_COD",StringType(),True), \
StructField("Product_NAM",StringType(),True), \
StructField("Product_VER", IntegerType(),True), \
StructField("ProductLine_NAM", StringType(), True), \
StructField("Language_COD", StringType(), True), \
StructField("ProductType_NAM", StringType(), True), \
StructField("Load_DAT", IntegerType(), True), \
StructField("LoadEnd_DAT", IntegerType(), True), \
StructField("edmChange_DTT", TimestampType(), True) \
])
my_df = spark.createDataFrame(data=data2,schema=schema)
df_res = spark.createDataFrame([(-1,)]).toDF("Product_ID")
for c in my_df.schema:
if str(c.name) == 'Product_ID':
continue
if str(c.dataType) == 'StringType':
df_res = df_res.withColumn(c.name, F.lit('Unknown'))
else:
df_res = df_res.withColumn(c.name, F.lit(None))
my_df.union(df_res).show()
---------- ----------- ----------- ----------- --------------- ------------ --------------- -------- ----------- -------------------
# |Product_ID|Product_COD|Product_NAM|Product_VER|ProductLine_NAM|Language_COD|ProductType_NAM|Load_DAT|LoadEnd_DAT| edmChange_DTT|
# ---------- ----------- ----------- ----------- --------------- ------------ --------------- -------- ----------- -------------------
# | xp3980| 2103| Product_1| 1| PdLine_23| XX1| PNT_1| 2| 36636|2020-08-20 10:00:00|
# | gi9387| 2411| Product_2| 1| PdLine_44| YT89| PNT_6| 2| 35847|2021-07-21 07:00:00|
# | -1| Unknown| Unknown| null| Unknown| Unknown| Unknown| null| null| null|
# ---------- ----------- ----------- ----------- --------------- ------------ --------------- -------- ----------- -------------------