I have a csv file containing addressId and address of the customers as below
addressId | address |
---|---|
ADD001 | "123, Maccolm Street, Copenhagen, Denmark" |
ADD002 | "384, East Avenue Street, New York, USA |
I want to parse the address column to get number, street, city and country. I am given initial code to build on to get the necessary output
object Address extends App {
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
case class AddressRawData(
addressId: String,
address: String
)
case class AddressData(
addressId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
// val addressDF: DataFrame = spark.read.option("header", "true").csv("src/main/resources/address_data.csv")
val addressDF= Seq[(String, String)](
("ADD001", "302, De Grassi Street, Toronto, Canada"),
("ADD002", "344, Oxford Street, London, United Kingdom")).toDF("addressID", "address")
val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]
}
I need to use addressParser function to parse my addressDS information. However, the parameter to the function is of type Seq. I am not sure how should I convert addressDS as an input to function to parse the raw data. some form of guidance to solve this is appreciated.
CodePudding user response:
Each DataSet
is further divided into partitions
. You can use mapPartitions
with a mapping Iterator[T] => Iterator[U]
to convert a DataSet[T]
into a DataSet[U]
.
So, you can just use your addressParser
as the argument for mapPartition
.
val rawAddressDataDS =
spark.read
.option("header", "true")
.csv(csvFilePath)
.as[AddressRawData]
val addressDataDS =
rawAddressDataDS
.map { rad =>
AddressData(
addressId = rad.addressId,
address = rad.address,
number = None,
road = None,
city = None,
country = None
)
}
.mapPartitions { unparsedAddresses =>
addressParser(unparsedAddresses.toSeq).toIterator
}