I am trying to write an RDD that contains public classes in Cassandra with Spark
class Test(private var id: String, private var randomNumber: Integer, private var lastUpdate: Instant) {
def setId(id: String): Unit = { this.id = id }
def getId: String = { this.id }
def setLastUpdater(lastUpdater: Instant): Unit = { this.lastUpdater = lastUpdater }
def getLastUpdater: Instant = { this.lastUpdater }
def setRandomNumber(number: Integer): Unit = { this.randomNumber = randomNumber }
def getRandomNumber: Integer = { this.randomNumber }
}
This class has all the Setters and Getters to maintain the encapsulation and I need it to not be a Case Class because I have to modify the values during the transformations.
The table corresponding to this entity in Cassandra has slightly different names for the fields:
CREATE TABLE IF NOT EXISTS test.test (
id uuid,
random_number int,
last_update timestamp,
PRIMARY KEY (id)
)
I am trying to write this RDD with the method saveToCassandra
implicit val connector = CassandraConnector(sc.getConf)
val rdd: RDD[Test]
rdd.saveToCassandra("test", "test")
but the method throws me an exception for the coincidence of the names of the attributes of the class with the names of the fields in the table
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in entity.Test: [id, random_number, last_update]
at scala.Predef$.require(Predef.scala:277)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:106)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:35)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:26)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:16)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:30)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:28)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:433)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:417)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
how can I write the entity in Cassandra without having to call the attributes the same and the attributes are private in the class?
CodePudding user response:
saveToCassandra
allows you to provide an optional ColumnSelector
:
def saveToCassandra(
keyspaceName: String,
tableName: String,
columns: ColumnSelector = AllColumns,
writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))(...): Unit
In your case you could use the following selector:
def selector = SomeColumns(
ColumnName("id"),
ColumnName("random_number", alias = Some("randomNumber")),
ColumnName("last_update", alias = Some("lastUpdate"))
)
Btw, while not the typical (and recommended) use of a case class, you could absolutely define fields as var
s and benefit from using a typed Dataset. That makes it very easy to rename fields before writing to Cassandra.