Home > OS >  Can I apply MERGE INTO on PySpark DataFrame?
Can I apply MERGE INTO on PySpark DataFrame?

Time:12-23

I have two PySpark DataFrames and I want to merge these DataFrames. When I try to use MERGE INTO statement, I get an error that there is no table. I am running the code in Databricks.

Sample code:

import pandas as pd

target_data = {'id': [1100, 1200, 1300, 1400, 1500],
              'name': ["Person1", "Person2", "Person3", "Person4", "Person5"],
              'location': ["Location1", "Location2", "Location3", None, "Location5"],
              'contact': [None, "Contact2", None, "Contact4", None],
               }

pdf = pd.DataFrame(target_data)
target = spark.createDataFrame(pdf)


source_data = {'id': [1400, 1500, 1600],
              'name': ["Person4", "Person5", "Person6"],
              'location': ["Location4", "Location5", "Location6"],
              'contact': ["Contact4", "Contact5", "Contact6"],
               }

pdf = pd.DataFrame(source_data)
source = spark.createDataFrame(pdf) 

And using SQL statement in the next cell:

%sql

MERGE INTO target as t
USING source as s
ON t.id = s.id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED THEN 
  INSERT *

I get the error:

enter image description here

Is there any way that I can merge two DataFrames? Should I convert them into Delta table first?

CodePudding user response:

To merge two DataFrames into a single DataFrame in PySpark, you can use either the join method or the union.

The join method allows merging two DataFrames based on a common column or set of columns. For example, suppose you have two DataFrames, df1 and df2, and you want to merge them based on the id column:

df1 = spark.createDataFrame([(1, 'Alice', 25), (2, 'Bob', 30)], ['id', 'name', 'age'])
df2 = spark.createDataFrame([(1, 'New York'), (2, 'Chicago')], ['id', 'city'])

# Merge df1 and df2 based on column 'id'
df = df1.join(df2, on='id')

df.show()


 --- ------ --- ------- 
| id|  name|age|   city|
 --- ------ --- ------- 
|  1| Alice| 25|New York|
|  2|   Bob| 30| Chicago|
 --- ------ --- ------- 

The union method allows merging two DataFrames, adding the rows of the second DataFrame below the rows of the first DataFrame. For example:

df1 = spark.createDataFrame([(1, 'Alice', 25), (2, 'Bob', 30)], ['id', 'name', 'age'])
df2 = spark.createDataFrame([(3, 'Charlie', 35), (4, 'Dave', 40)], ['id', 'name', 'age'])

# Merge df1 and df2
df = df1.union(df2)

df.show()


 --- ------- --- 
| id|   name|age|
 --- ------- --- 
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
|  4|   Dave| 40|
 --- ------- --- 

Note that when using the union method, the two DataFrames must have the same schema (that is, the same set of columns and data types). If the DataFrames have different schemas, PySpark will throw an exception.

CodePudding user response:

Extending on the other answers here and if you are looking to drop duplicates as well you can leverage dropDuplicates function.

>>> output_df=source.union(target).dropDuplicates(["id"])
>>> output_df.orderBy(["id"]).show()
 ---- ------- --------- -------- 
|  id|   name| location| contact|
 ---- ------- --------- -------- 
|1100|Person1|Location1|    null|
|1200|Person2|Location2|Contact2|
|1300|Person3|Location3|    null|
|1400|Person4|Location4|Contact4|
|1500|Person5|Location5|Contact5|
|1600|Person6|Location6|Contact6|
 ---- ------- --------- -------- 

Output:

  • Related