I have two dataframes df_1
and df_2
:
rdd = spark.sparkContext.parallelize([
(1, '', '5647-0394'),
(2, '', '6748-9384'),
(3, '', '9485-9484')])
df_1 = spark.createDataFrame(rdd, schema=['ID', 'UPDATED_MESSAGE', 'ZIP_CODE'])
# --- --------------- ---------
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# --- --------------- ---------
# | 1| |5647-0394|
# | 2| |6748-9384|
# | 3| |9485-9484|
# --- --------------- ---------
rdd = spark.sparkContext.parallelize([
('JAMES', 'INDIA_WON', '6748-9384')])
df_2 = spark.createDataFrame(rdd, schema=['NAME', 'CODE', 'ADDRESS_CODE'])
# ----- --------- ------------
# | NAME| CODE|ADDRESS_CODE|
# ----- --------- ------------
# |JAMES|INDIA_WON| 6748-9384|
# ----- --------- ------------
I need to update df_1
column 'UPDATED MESSAGE' with value 'INDIA_WON' from df_2 column 'CODE'. Currently the column "UPDATED_MESSAGE" is Null. I need to update every row with value as 'INDIA_WON', How can we do it in PySpark?
The condition here is if we find 'ADDRESS_CODE" value in df_1
column "ZIP_CODE", we need to populate all the values in 'UPDATED_MESSAGE' = 'INDIA_WON'.
CodePudding user response:
The below Python method returns either an original df_1
when no ZIP_CODE
match has been found in df_2
or an modified df_1
where column UPDATED_MESSAGE
is filled in with the value from df_2.CODE
column:
from pyspark.sql.functions import lit
def update_df1(df_1, df_2):
if (df_1.join(df_2, on=(col("ZIP_CODE") == col("ADDRESS_CODE")), how="inner").count() == 0):
return df_1
code = df_2.collect()[0]["CODE"]
return df_1.withColumn("UPDATED_MESSAGE", lit(code))
update_df1(df_1, df_2).show()
--- --------------- ---------
| ID|UPDATED_MESSAGE| ZIP_CODE|
--- --------------- ---------
| 1| INDIA_WON|5647-0394|
| 2| INDIA_WON|6748-9384|
| 3| INDIA_WON|9485-9484|
--- --------------- ---------
CodePudding user response:
I hope I've interpreted what you need well. If yes, then your logic seems strange. It seems, that your tables are very small. Spark is the engine for big data (millions to billions of records). If your tables are small, consider doing things in Pandas.
from pyspark.sql import functions as F
df_2 = df_2.groupBy('ADDRESS_CODE').agg(F.first('CODE').alias('CODE'))
df_joined = df_1.join(df_2, df_1.ZIP_CODE == df_2.ADDRESS_CODE, 'left')
df_filtered = df_joined.filter(~F.isnull('ADDRESS_CODE'))
if bool(df_filtered.head(1)):
df_1 = df_1.withColumn('UPDATED_MESSAGE', F.lit(df_filtered.head()['CODE']))
df_1.show()
# --- --------------- ---------
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# --- --------------- ---------
# | 1| INDIA_WON|5647-0394|
# | 2| INDIA_WON|6748-9384|
# | 3| INDIA_WON|9485-9484|
# --- --------------- ---------
CodePudding user response:
Why use dataframes when Spark SQL is so much easier?
Turn data frames into temporary views.
%python
df_1.createOrReplaceTempView("tmp_zipcodes")
df_2.createOrReplaceTempView("tmp_person")
Write simple Spark SQL to get answer.
%sql
select
a.id,
case when b.code is null then '' else b.code end as update_message,
a.zip_code
from tmp_zipcodes as a
left join tmp_person as b
on a.zip_code = b.address_code
Output from query. Use spark.sql() to make an dataframe if you need to write to disk.