Home > Mobile >  Write RDD[entity] in cassandra from Spark
Write RDD[entity] in cassandra from Spark

Time:06-29

I am trying to write an RDD that contains public classes in Cassandra with Spark

class Test(private var id: String, private var randomNumber: Integer, private var lastUpdate: Instant) {
        def setId(id: String): Unit = { this.id = id }
        def getId: String = { this.id }
        def setLastUpdater(lastUpdater: Instant): Unit = { this.lastUpdater = lastUpdater }
        def getLastUpdater: Instant = { this.lastUpdater }
        def setRandomNumber(number: Integer): Unit = { this.randomNumber = randomNumber }
        def getRandomNumber: Integer = { this.randomNumber }
}

This class has all the Setters and Getters to maintain the encapsulation and I need it to not be a Case Class because I have to modify the values during the transformations.

The table corresponding to this entity in Cassandra has slightly different names for the fields:

CREATE TABLE IF NOT EXISTS test.test (
  id uuid,
  random_number int,
  last_update timestamp,
  PRIMARY KEY (id)
) 

I am trying to write this RDD with the method saveToCassandra

implicit val connector = CassandraConnector(sc.getConf)
val rdd: RDD[Test]
rdd.saveToCassandra("test", "test")

but the method throws me an exception for the coincidence of the names of the attributes of the class with the names of the fields in the table

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in entity.Test: [id, random_number, last_update]
at scala.Predef$.require(Predef.scala:277)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:106)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:35)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:26)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:16)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:30)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:28)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:433)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:417)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)

how can I write the entity in Cassandra without having to call the attributes the same and the attributes are private in the class?

CodePudding user response:

saveToCassandra allows you to provide an optional ColumnSelector:

  def saveToCassandra(
    keyspaceName: String,
    tableName: String,
    columns: ColumnSelector = AllColumns,
    writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))(...): Unit

In your case you could use the following selector:

def selector = SomeColumns(
  ColumnName("id"),
  ColumnName("random_number", alias = Some("randomNumber")),
  ColumnName("last_update", alias = Some("lastUpdate"))
) 

Btw, while not the typical (and recommended) use of a case class, you could absolutely define fields as vars and benefit from using a typed Dataset. That makes it very easy to rename fields before writing to Cassandra.

  • Related