I'm trying to map over previously joined dataset. This is how my DS looks like.
case class CustomerData(
customerId: String,
forename: String,
surname: String
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
val customerDS: Dataset[CustomerData] = customerDF.as[CustomerData]
val accountDS: Dataset[AccountData] = accountDF.withColumn("balance", 'balance.cast("long")).as[AccountData]
case class CustomerAccountClass(
customerId: String,
forename: String,
surname: String,
accountId: String,
balance: Long
val customerAccountDataDS = customerDS
.joinWith(accountDS, customerDS.col("customerId") === accountDS.col("customerId"), "left")
.map {
case (customer, account) => CustomerAccountClass(customer.customerId, customer.forename, customer.surname, account.accountId, account.balance)
case (customer, null) => CustomerAccountClass(customer.customerId, customer.forename, customer.surname, "", 0)
}
This is the essential part of my code. Basically I want to create new dataset with CustomerID, name, surname, and if possible account details (account ID and balance). Unfortunately whatever I try to do I receive error "NullPointerException". Am I missing anything? I don't have any nulls in column 1, only in second one, as far as I understand my code should be sufficient. Thank you.
CodePudding user response:
Try to swap the order your cases inside map
. As far as I know, the (customer, account)
will also match when account is null. If you do it the other way around, the actual null-case will match first:
.map {
case (customer, null) => CustomerAccountClass(customer.customerId, customer.forename, customer.surname, "", 0)
case (customer, account) => CustomerAccountClass(customer.customerId, customer.forename, customer.surname, account.accountId, account.balance)
}