Home > Net >  Convert spark scala dataset of one type to another
Convert spark scala dataset of one type to another

Time:12-10

I have a dataset with following case class type:

  case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )

I want to convert it to:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

Using a parser function:

  def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }

I am new to scala and spark. Could anyone please let me know how can this be done?

CodePudding user response:

You were on the right path! There are multiple ways of doing this of course. But as you're already on the way by making some case classes, and you've started making a parsing function an elegant solution is by using the Dataset's map function. From the docs, this map function signature is the following:

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 

Where T is the starting type (AddressRawData in your case) and U is the type you want to get to (AddressData in your case). So the input of this map function is a function that transforms a AddressRawData to a AddressData. That could perfectly be the addressParser you've started making!

Now, your current addressParser has the following signature:

def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]

In order to be able to feed it to that map function, we need to make this signature:

def newAddressParser(unparsedAddress: AddressRawData): AddressData

Knowing all of this, we can work further! An example would be the following:

import spark.implicits._
import scala.util.Try

// Your case classes
case class AddressRawData(addressId: String, customerId: String, address: String)
case class AddressData(
  addressId: String,
  customerId: String,
  address: String,
  number: Option[Int],
  road: Option[String],
  city: Option[String],
  country: Option[String]
)

// Your addressParser function, adapted to be able to feed into the Dataset.map
// function
def addressParser(rawAddress: AddressRawData): AddressData = {
  val addressArray = rawAddress.address.split(", ")
  AddressData(
    rawAddress.addressId,
    rawAddress.customerId,
    rawAddress.address,
    Try(addressArray(0).toInt).toOption,
    Try(addressArray(1)).toOption,
    Try(addressArray(2)).toOption,
    Try(addressArray(3)).toOption
  )
}

// Creating a sample dataset
val rawDS = Seq(
  AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),
  AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")
).toDS

val parsedDS = rawDS.map(addressParser)

parsedDS.show                                                                                                                                                                                                                                                            
 --------- ---------- -------------------- ------ ------------- ---------------- -----------                                                                                                                                                                                    
|addressId|customerId|             address|number|         road|            city|    country|                                                                                                                                                                                   
 --------- ---------- -------------------- ------ ------------- ---------------- -----------                                                                                                                                                                                    
|        1|         1|20, my super road...|    20|my super road|   beautifulCity|someCountry|                                                                                                                                                                                   
|        1|         1|badFormat, some r...|  null|    some road|cityButNoCountry|       null|                                                                                                                                                                                   
 --------- ---------- -------------------- ------ ------------- ---------------- ----------- 

As you see, thanks to the fact that you had already foreseen that parsing can go wrong, it was easily possible to use scala.util.Try to try and get the pieces of that raw address and add some robustness in there (the second line contains some null values where it could not parse the address string.

Hope this helps!

  • Related