I have a dataset with following case class type:
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
I want to convert it to:
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int], //i.e. it is optional
road: Option[String],
city: Option[String],
country: Option[String]
)
Using a parser function:
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
I am new to scala and spark. Could anyone please let me know how can this be done?
CodePudding user response:
You were on the right path! There are multiple ways of doing this of course. But as you're already on the way by making some case classes, and you've started making a parsing function an elegant solution is by using the Dataset's map
function. From the docs, this map
function signature is the following:
def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
Where T
is the starting type (AddressRawData
in your case) and U
is the type you want to get to (AddressData
in your case). So the input of this map
function is a function that transforms a AddressRawData
to a AddressData
. That could perfectly be the addressParser
you've started making!
Now, your current addressParser
has the following signature:
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]
In order to be able to feed it to that map
function, we need to make this signature:
def newAddressParser(unparsedAddress: AddressRawData): AddressData
Knowing all of this, we can work further! An example would be the following:
import spark.implicits._
import scala.util.Try
// Your case classes
case class AddressRawData(addressId: String, customerId: String, address: String)
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
// Your addressParser function, adapted to be able to feed into the Dataset.map
// function
def addressParser(rawAddress: AddressRawData): AddressData = {
val addressArray = rawAddress.address.split(", ")
AddressData(
rawAddress.addressId,
rawAddress.customerId,
rawAddress.address,
Try(addressArray(0).toInt).toOption,
Try(addressArray(1)).toOption,
Try(addressArray(2)).toOption,
Try(addressArray(3)).toOption
)
}
// Creating a sample dataset
val rawDS = Seq(
AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),
AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")
).toDS
val parsedDS = rawDS.map(addressParser)
parsedDS.show
--------- ---------- -------------------- ------ ------------- ---------------- -----------
|addressId|customerId| address|number| road| city| country|
--------- ---------- -------------------- ------ ------------- ---------------- -----------
| 1| 1|20, my super road...| 20|my super road| beautifulCity|someCountry|
| 1| 1|badFormat, some r...| null| some road|cityButNoCountry| null|
--------- ---------- -------------------- ------ ------------- ---------------- -----------
As you see, thanks to the fact that you had already foreseen that parsing can go wrong, it was easily possible to use scala.util.Try
to try and get the pieces of that raw address and add some robustness in there (the second line contains some null
values where it could not parse the address
string.
Hope this helps!