Home > OS >  How to pass ojai configuration from driver to executors in spark?
How to pass ojai configuration from driver to executors in spark?

Time:09-24

I wonder how can I pass OJAI connection from spark driver to its executors. Here's my code:

val connection = DriverManager.getConnection("ojai:mapr:")
val store = connection.getStore("/tables/table1")
val someStream = messagesDStream.mapPartitions {
  iterator => {
  val list = iterator
    .map(record => record.value())
    .toList
    .asJava
    //TODO serializacja, deserializacja, interface serializable w javie
  val query = connection
    .newQuery()
    .where(connection.newCondition()
      .in("_id", list)
      .build())
    .build()}

and the error I got:

    Caused by: java.io.NotSerializableException: com.mapr.ojai.store.impl.OjaiConnection
Serialization stack:
        - object not serializable (class: com.mapr.ojai.store.impl.OjaiConnection, value: com.mapr.ojai.store.impl.OjaiConnection@2a367e93)
        - field (class: com.example.App$$anonfun$1, name: connection$1, type: interface org.ojai.store.Connection)
        - object (class com.example.App$$anonfun$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
        ...

As long as the connection to the OJAI is inside the mapPartitions function, everything is fine and dandy. I know that I need to pass the configuration from the driver to executors in order for the code to work but I don't know how to do it. Tschüs!

CodePudding user response:

You're running into spark's most infamous error - task not serialisable. Essentailly what it means is that one of the classes or objects you're attempting to serialise - send over the network from the driver to the executors - cannot be processed in this way: here, it's the ojai connector.

You cannot pass the connection itself from the driver to the executors - what you can do, while avoiding constant re-creation of the connection for each batch of RDDs coming from your stream, is declare the connection in a companion object as

@transient lazy val connection = ...

And refer to that inside mapPartitions. This will ensure that each executor has a connection to the database which will persist through multiple batches, as fields marked in this way are not creted on the driver then serialised but created on each executor instead.

  • Related