Home > Enterprise >  Spark shell not starting with spark-cassandra-connector 3.1.0
Spark shell not starting with spark-cassandra-connector 3.1.0

Time:09-30

I've been trying to start spark-shell with Cassandra connector.

When I tried running it with spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.1.0 or when I even compiled connector from https://github.com/datastax/spark-cassandra-connector and started like spark-shell --jars ~/.asdf/installs/spark/3.1.2/jars/spark-cassandra-connector-assembly-3.1.0-11-g53f24ce9.jar then I got this exception (both cases)

java.io.IOException: Socket is not connected
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
21/09/29 14:50:59 ERROR TransportClient: Failed to send RPC /jars/spark-cassandra-connector-assembly-3.1.0-11-g53f24ce9.jar to /172.20.12.135:64320: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
21/09/29 14:50:59 ERROR SparkContext: Error initializing SparkContext.
java.io.IOException: Failed to send RPC /jars/spark-cassandra-connector-assembly-3.1.0-11-g53f24ce9.jar to /172.20.12.135:64320: java.nio.channels.ClosedChannelException
    at org.apache.spark.network.client.TransportClient$2.handleFailure(TransportClient.java:164)
    at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:340)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    ... 22 more
21/09/29 14:50:59 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /172.20.12.135:64320 is closed
21/09/29 14:50:59 ERROR Utils: Uncaught exception in thread main
java.lang.NullPointerException
    at org.apache.spark.scheduler.local.LocalSchedulerBackend.org$apache$spark$scheduler$local$LocalSchedulerBackend$$stop(LocalSchedulerBackend.scala:173)
    at org.apache.spark.scheduler.local.LocalSchedulerBackend.stop(LocalSchedulerBackend.scala:144)
    at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:881)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2370)
    at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2069)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2069)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:671)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2672)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:945)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:939)
    at org.apache.spark.repl.Main$.createSparkSession(Main.scala:106)
    at $line3.$read$$iw$$iw.<init>(<console>:15)
    at $line3.$read$$iw.<init>(<console>:42)
    at $line3.$read.<init>(<console>:44)
    at $line3.$read$.<init>(<console>:48)
    at $line3.$read$.<clinit>(<console>)
    at $line3.$eval$.$print$lzycompute(<console>:7)
    at $line3.$eval$.$print(<console>:6)
    at $line3.$eval.$print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
    at scala.tools.nsc.interpreter.IMain.$anonfun$quietRun$1(IMain.scala:224)
    at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
    at scala.tools.nsc.interpreter.IMain.quietRun(IMain.scala:224)
    at org.apache.spark.repl.SparkILoop.$anonfun$initializeSpark$2(SparkILoop.scala:83)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.repl.SparkILoop.$anonfun$initializeSpark$1(SparkILoop.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:99)
    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:83)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$4(SparkILoop.scala:165)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.tools.nsc.interpreter.ILoop.$anonfun$mumly$1(ILoop.scala:168)
    at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
    at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:165)
    at org.apache.spark.repl.SparkILoop.loopPostInit$1(SparkILoop.scala:153)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$10(SparkILoop.scala:221)
    at org.apache.spark.repl.SparkILoop.withSuppressedSettings$1(SparkILoop.scala:189)
    at org.apache.spark.repl.SparkILoop.startup$1(SparkILoop.scala:201)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:236)
    at org.apache.spark.repl.Main$.doMain(Main.scala:78)
    at org.apache.spark.repl.Main$.main(Main.scala:58)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
21/09/29 14:50:59 WARN MetricsSystem: Stopping a MetricsSystem that is not running
21/09/29 14:50:59 ERROR Main: Failed to initialize Spark session.
java.io.IOException: Failed to send RPC /jars/spark-cassandra-connector-assembly-3.1.0-11-g53f24ce9.jar to /172.20.12.135:64320: java.nio.channels.ClosedChannelException
    at org.apache.spark.network.client.TransportClient$2.handleFailure(TransportClient.java:164)
    at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:340)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    ... 22 more
21/09/29 14:50:59 ERROR Utils: Uncaught exception in thread Thread-1
java.lang.NullPointerException
    at org.apache.spark.executor.Executor.$anonfun$stop$3(Executor.scala:332)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:222)
    at org.apache.spark.executor.Executor.stop(Executor.scala:332)
    at org.apache.spark.executor.Executor.$anonfun$new$2(Executor.scala:76)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
    at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

I tried to nslookup that IP but got no result. Can anyone tell me what's wrong or point me to working instructions that would allow me to start spark-shell session with working Cassandra connector?

Thanks

CodePudding user response:

I found the problem. Spark-shell is trying to connect to spark-master and tries that IP address (which is eth0 of my machine at this time).

So the problem is that no spark-master job is available for spark-shell.

CodePudding user response:

Your spark-shell command is incomplete. You need to specify the Spark master URL if you're connecting to a remote Spark cluster.

Here's an example that shows the minimum options you need to provide to launch a Spark shell:

$ spark-shell
    --master <spark_master_url>
    --packages com.datastax.spark:spark-cassandra-connector_2.12:3.1.0
    --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
    --conf spark.cassandra.connection.host=<cassandra_node_ip>
    --conf spark.cassandra.auth.username=<cassandra_role>
    --conf spark.cassandra.auth.password=<cassandra_password>

If your cluster doesn't have authentication enabled, you can leave out the username and password options. Cheers!

  • Related