Unable to load data from cassandra; I understand issue is with guava jar; tried different versions of guava; unable to identify the expected version of jar. (replaced with datastax shared jar as well)
Version - Scala 2.11.12, Spark 2.3.2.3.1.4.41-3 jars using spark-cassandra-connector_2.11-2.3.2.jar,cassandra-driver-core-3.0.0.jar, commons-configuration-1.7.jar, java-driver-shaded-guava-25.1-jre.jar or one off guava jars version 19/24/31 at time of spark submit.
pyspark script & spark submit after the below:
Error:
File "cass.py", line 6, in <module>
data_df = ss.read.format("org.apache.spark.sql.cassandra").options(keyspace="xxxxx",table="xxxxx").load()
File "/disk-3/hadoop/yarn/local/usercache/vdfidt1/appcache/application_1660663467107_0171/container_e68_1660663467107_0171_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
File "/disk-3/hadoop/yarn/local/usercache/vdfidt1/appcache/application_1660663467107_0171/container_e68_1660663467107_0171_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/disk-3/hadoop/yarn/local/usercache/vdfidt1/appcache/application_1660663467107_0171/container_e68_1660663467107_0171_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/disk-3/hadoop/yarn/local/usercache/vdfidt1/appcache/application_1660663467107_0171/container_e68_1660663467107_0171_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o103.load.
py4j.protocol.Py4JJavaError: An error occurred while calling o103.load.
: java.lang.NoSuchMethodError: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
at com.datastax.driver.core.policies.Policies$Builder.build(Policies.java:285)
at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1246)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:116)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:181)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1264)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:131)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:159)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32)
at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:79)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory$.forSystemLocalPartitioner(TokenFactory.scala:98)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:272)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
pyscript - cass.py
import pyspark
from pyspark.sql import SparkSession,SQLContext
ss = SparkSession.builder.appName("pyspark cassandra").getOrCreate()
data_df = ss.read.format("org.apache.spark.sql.cassandra")
.options(keyspace="reg_idt1_vdf",table="veh")
.load()
data_df.show()
ss.stop()
spark submit - cass.sh
function setJars() {
i=0
for file in /home/tst1/*.jar
do
JARS=${JARS},${file}
done
}
setJars
spark-submit \
--name 'pyspark cassandra connector' \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 20 \
--jars ${JARS} \
--conf "spark.yarn.maxAppAttempts=1" \
--conf "spark.cassandra.connection.host=xx.xx.xx.xx" \
--conf "spark.cassandra.auth.username=xxxxx" \
--conf "spark.cassandra.auth.password=xxxxxx" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.maxExecutors=100" \
--conf "spark.dynamicAllocation.minExecutors=10" \
--conf "spark.executor.cores=2" \
--conf "spark.dynamicAllocation.executorIdleTimeout=500s" \
--conf "spark.authenticate=true" \
$/home/tst1/cass.py
exit 0
CodePudding user response:
The most likely cause of the problem is that there is a conflict in the components you've packaged with your app. For reference, the list of dependencies corresponding versions for Spark Cassandra connector v2.3.2 are documented in Versions.scala
.
Also, there is no need to include the Cassandra Java driver since the connector already embeds the driver automatically.
Our general recommendation is to launch spark-submit
with the --packages
option. For example:
$ spark-submit --packages datastax:spark-cassandra-connector:2.3.2-s_2.11
This will automatically download all the necessary dependencies to use the connector. This also ensures that only the compatible versions are included.
If you really want to build a fat JAR, you can download the "assembly" JAR from Maven if one exists for your Spark version. The connector assembly JAR contains all the dependencies required to use the connector. Cheers!
CodePudding user response:
@Eric Ramirez as I am unable to use "--packages" due to firewall issues, finally able to pull data from cassandra by providing the entire suite of dependencies mentioned in the maven pom. https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.3.2/spark-cassandra-connector_2.11-2.3.2.pom
Thanks a lot!!