Home > Mobile >  How to input and output an Seq of an object to a function in Scala
How to input and output an Seq of an object to a function in Scala

Time:10-11

I have a csv file containing addressId and address of the customers as below

addressId address
ADD001 "123, Maccolm Street, Copenhagen, Denmark"
ADD002 "384, East Avenue Street, New York, USA

I want to parse the address column to get number, street, city and country. I am given initial code to build on to get the necessary output

object Address extends App {

  val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()

  import spark.implicits._

  Logger.getRootLogger.setLevel(Level.WARN)

  case class AddressRawData(
                             addressId: String,
                             address: String
                           )

  case class AddressData(
                          addressId: String,
                          address: String,
                          number: Option[Int],
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")

      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }
 // val addressDF: DataFrame = spark.read.option("header", "true").csv("src/main/resources/address_data.csv")

val addressDF= Seq[(String, String)](
("ADD001", "302, De Grassi Street, Toronto, Canada"),
("ADD002", "344, Oxford Street, London, United Kingdom")).toDF("addressID", "address")

  val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]
}

I need to use addressParser function to parse my addressDS information. However, the parameter to the function is of type Seq. I am not sure how should I convert addressDS as an input to function to parse the raw data. some form of guidance to solve this is appreciated.

CodePudding user response:

Each DataSet is further divided into partitions. You can use mapPartitions with a mapping Iterator[T] => Iterator[U] to convert a DataSet[T] into a DataSet[U].

So, you can just use your addressParser as the argument for mapPartition.

val rawAddressDataDS =
  spark.read
    .option("header", "true")
    .csv(csvFilePath)
    .as[AddressRawData]

val addressDataDS =
  rawAddressDataDS
    .map { rad =>
      AddressData(
        addressId = rad.addressId,
        address = rad.address,
        number = None,
        road = None,
        city = None,
        country = None
      )
    }
    .mapPartitions { unparsedAddresses =>
      addressParser(unparsedAddresses.toSeq).toIterator
    }
  • Related