Assuming the following schema for a table - Places:
root
|-- place_id: string (nullable = true)
|-- street_address: string (nullable = true)
|-- city: string (nullable = true)
|-- state_province: string (nullable = true)
|-- postal_code: string (nullable = true)
|-- country: string (nullable = true)
|-- neighborhood: string (nullable = true)
val places
is of type Dataset[Row]
and I have the following case class:
case class csm(
city: Option[String] = None,
stateProvince: Option[String] = None,
country: Option[String] = None
)
How would I go about altering or creating a new data set that has the following schema
root
|-- place_id: string (nullable = true)
|-- street_address: string (nullable = true)
|-- subpremise: string (nullable = true)
|-- city: string (nullable = true)
|-- state_province: string (nullable = true)
|-- postal_code: string (nullable = true)
|-- country: string (nullable = true)
|-- neighborhood: string (nullable = true)
|-- csm: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state_province: string (nullable = true)
| |-- country: string (nullable = true)
I've been looking into withColumn
methods and they seem to require UDFs, the challenge here being that I have to manually specify the columns which will be easy for this use case, but as my problem scales it will be difficult to manually maintain them
Used this as a reference: https://intellipaat.com/community/16433/how-to-add-a-new-struct-column-to-a-dataframe
CodePudding user response:
In your case class declaration you have stateProvince
parameter, but in your dataframe there's state_province
column instead.
I'm not sure if it's not a typo, so first - some quick-n-dirty not-thoroughly-tested camelCase to snake_case converter just in case:
def normalize(x: String): String =
"([a-z])([A-Z])".r replaceAllIn(x, m => s"${m.group(1)}_${m.group(2).toLowerCase}")
Next, let's get the parameters of a case class:
val case_class_params = Seq[csm]().toDF.columns
And with this, we can now get columns for our case class struct
:
val csm_cols = case_class_params.map(x => col(normalize(x)))
val df2 = df.withColumn("csm", struct(csm_cols:_*))
-------- -------------- --------- -------------- ----------- ------------ ----------- ----------------------------------------
|place_id|street_address|city |state_province|postal_code|country |neghborhood|csm |
-------- -------------- --------- -------------- ----------- ------------ ----------- ----------------------------------------
|123 |str_addr |some_city|some_province |some_zip |some_country|NA |{some_city, some_province, some_country}|
-------- -------------- --------- -------------- ----------- ------------ ----------- ----------------------------------------
root
|-- place_id: string (nullable = true)
|-- street_address: string (nullable = true)
|-- city: string (nullable = true)
|-- state_province: string (nullable = true)
|-- postal_code: string (nullable = true)
|-- country: string (nullable = true)
|-- neghborhood: string (nullable = true)
|-- csm: struct (nullable = false)
| |-- city: string (nullable = true)
| |-- state_province: string (nullable = true)
| |-- country: string (nullable = true)
CodePudding user response:
case class Source(
place_id: Option[String],
street_address: Option[String],
city: Option[String],
state_province: Option[String],
postal_code: Option[String],
country: Option[String],
neighborhood: Option[String]
)
case class Csm(
city: Option[String] = None,
stateProvince: Option[String] = None,
country: Option[String] = None
)
case class Result(
place_id: Option[String],
street_address: Option[String],
subpremise: Option[String],
city: Option[String],
state_province: Option[String],
postal_code: Option[String],
country: Option[String],
neighborhood: Option[String],
csm: Csm
)
import spark.implicits._
val sourceDF = Seq(
Source(
Some("s-1-1"),
Some("s-1-2"),
Some("s-1-3"),
Some("s-1-4"),
Some("s-1-5"),
Some("s-1-6"),
Some("s-1-7")
),
Source(
Some("s-2-1"),
Some("s-2-2"),
Some("s-2-3"),
Some("s-2-4"),
Some("s-2-5"),
Some("s-2-6"),
Some("s-2-7")
)
).toDF()
val resultDF = sourceDF
.map(r => {
Result(
Some(r.getAs[String]("place_id")),
Some(r.getAs[String]("street_address")),
Some("set your value"),
Some(r.getAs[String]("city")),
Some(r.getAs[String]("state_province")),
Some(r.getAs[String]("postal_code")),
Some(r.getAs[String]("country")),
Some(r.getAs[String]("neighborhood")),
Csm(
Some(r.getAs[String]("city")),
Some(r.getAs[String]("state_province")),
Some(r.getAs[String]("country"))
)
)
})
.toDF()
resultDF.printSchema()
// root
// |-- place_id: string (nullable = true)
// |-- street_address: string (nullable = true)
// |-- subpremise: string (nullable = true)
// |-- city: string (nullable = true)
// |-- state_province: string (nullable = true)
// |-- postal_code: string (nullable = true)
// |-- country: string (nullable = true)
// |-- neighborhood: string (nullable = true)
// |-- csm: struct (nullable = true)
// | |-- city: string (nullable = true)
// | |-- stateProvince: string (nullable = true)
// | |-- country: string (nullable = true)
resultDF.show(false)
// -------- -------------- -------------- ----- -------------- ----------- ------- ------------ ---------------------
// |place_id|street_address|subpremise |city |state_province|postal_code|country|neighborhood|csm |
// -------- -------------- -------------- ----- -------------- ----------- ------- ------------ ---------------------
// |s-1-1 |s-1-2 |set your value|s-1-3|s-1-4 |s-1-5 |s-1-6 |s-1-7 |[s-1-3, s-1-4, s-1-6]|
// |s-2-1 |s-2-2 |set your value|s-2-3|s-2-4 |s-2-5 |s-2-6 |s-2-7 |[s-2-3, s-2-4, s-2-6]|
// -------- -------------- -------------- ----- -------------- ----------- ------- ------------ ---------------------