I am trying to use the method .delete()
to remove one record from a delta table as follows:
val my_dt = : DeltaTable = DeltaTable.forPath(ss, my_delta_path)
my_dt.delete("pk= '123456'")
When I run my code in Intellij I am getting the following exception:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable.<init>(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;Lorg/apache/spark/sql/catalyst/expressions/Expression;)V
at io.delta.tables.execution.DeltaTableOperations.$anonfun$executeDelete$1(DeltaTableOperations.scala:44)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:104)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:90)
at io.delta.tables.DeltaTable.improveUnsupportedOpError(DeltaTable.scala:42)
at io.delta.tables.execution.DeltaTableOperations.executeDelete(DeltaTableOperations.scala:41)
at io.delta.tables.execution.DeltaTableOperations.executeDelete$(DeltaTableOperations.scala:41)
at io.delta.tables.DeltaTable.executeDelete(DeltaTable.scala:42)
at io.delta.tables.DeltaTable.delete(DeltaTable.scala:183)
at io.delta.tables.DeltaTable.delete(DeltaTable.scala:172)
I read that the issues is I need to pass the following two parameters:
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
So I tried to pass them in several different ways without success:
First when building the spark session:
val ss = SparkSession.builder
.master("local[*]")
.config("spark.master", "local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName(appName)
.getOrCreate()
Also as program parameters in Intellij itself:
Besides, in the pom I added the following dependencies:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-storage</artifactId>
<version>2.1.0</version>
</dependency>
However the exception is still there. What am I missing?
As additional info, I am able to write and read delta tables in my local system without issues using:
my_df.write
.format("delta")
.option("overwriteSchema", "true")
.mode("overwrite")
.save(my_path)
val my_dt : DeltaTable = DeltaTable.forPath(ss, my_path)
CodePudding user response:
For Spark 3.2.x you need to use Delta 2.0.x, or to use Delta 2.1.x you need to upgrade to Spark 3.3.x. Check releases page for Spark/Delta compatibility matrix.