Home > Software engineering >  Error reading Cassandra TTL and WRITETIME with Spark 3.0
Error reading Cassandra TTL and WRITETIME with Spark 3.0

Time:11-15

Although the latest spark-cassandra-connector from DataStax states it supports reading/writing TTL and WRITETIME I am still receiving a SQL undefined function error.

Using Databricks with library com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0 and a Spark Config for CassandraSparkExtensions on a 9.1 LTS ML (includes Apache Spark 3.1.2, Scala 2.12) Cluster. CQL version 3.4.5.

spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

Confirmed the config with Notebook code:

spark.conf.get("spark.sql.extensions")

Out[7]: 'com.datastax.spark.connector.CassandraSparkExtensions'

# Cassandra connection configs using Data Source API V2
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.host", "10.1.4.4")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.port", "9042")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.username", dbutils.secrets.get(scope = "myScope", key = "CassUsername"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.password", dbutils.secrets.get(scope = "myScope", key = "CassPassword")) 
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.enabled", True)
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.path", "/dbfs/user/client-truststore.jks")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.password", dbutils.secrets.get("key-vault-secrets", "cassTrustPassword"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.dse.continuous_paging_enabled", False) 

# catalog name will be "cassandrauat" for Cassandra
spark.conf.set("spark.sql.catalog.cassandrauat", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set("spark.sql.catalog.cassandrauat.prop", "key")
spark.conf.set("spark.sql.defaultCatalog", "cassandrauat") # will override Spark to use Cassandra for all databases
%sql 
select id, did, ts, val, ttl(val) 
from cassandrauat.myKeyspace.myTable

Error in SQL statement: AnalysisException: Undefined function: 'ttl'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 25

When running this same CQL query on the Cassandra cluster directly it produces a result.

Any help with why the CassandraSparkExtensions aren't loading appreciated.

Adding full stack trace for NoSuchMethodError that occured after pre-loading library

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.replaceMetadata(CassandraMetadataFunctions.scala:152)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.$anonfun$applyOrElse$2(CassandraMetadataFunctions.scala:187)
    at scala.collection.immutable.Stream.foldLeft(Stream.scala:549)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:186)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:90)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:221)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:221)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:218)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:210)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:210)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:271)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:264)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:191)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:109)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:246)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:347)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:245)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:96)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:689)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:684)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.$anonfun$executeSql$1(SQLDriverLocal.scala:91)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:37)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:129)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

CodePudding user response:

If you just added Spark Cassandra Connector via Clusters UI, then it will not work - the reason for that is libraries are installed into cluster after Spark already started, so class specified in spark.sql.extensions isn't found.

To fix this you need to put Jar file to cluster nodes before Spark starts - you can do it using the cluster init script that will either download jar directly with something like this (but it will download multiple copies - for each node):

#!/bin/bash

wget -q -O /databricks/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
  https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-assembly_2.12/3.1.0/spark-cassandra-connector-assembly_2.12-3.1.0.jar

or it's better to download assembly jar, put onto DBFS, and then copy from DBFS into destination directory (for example, if it's uploaded to /FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar):

#!/bin/bash

cp /dbfs/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
  /databricks/jars/

Update (13.11.2021): SCC 3.1.0 isn't fully compatible with Spark 3.2.0 (parts of it are already in DBR 9.1). See SPARKC-670 for more details.

  • Related