I am processing a dataframe and converting into Dataset[Event] using Event case class.How ever there are nested Ids for which i need to multiply the events based on the flattening of nested device:os.
I am able to return the case class Event at the Kafka event level. But not sure how to multiply events .
Kafka incoming Event:
{
"partition": 1,
"key": "34768_20220203_MFETP501",
"offset": 1841543,
"createTime": 1646041475348,
"topic": "topic_int",
"publishTime": 1646041475344,
"errorCode": 0,
"userActions": {
"productId": "3MFETP501",
"createdDate": "2022-02-26T11:19:35.786Z",
"events": [
{
"GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
"eventDate": "2022-02-26T11:19:35.786Z",
"familyId": 2010,
"productTypeId": 1004678,
"serialID": "890479804",
"productName": "MFE Total Protection 2021 Family Pack",
"features": {
"mapping": [
{
"deviceId": 999795,
"osId": [
100
]
},
{
"deviceId": 987875
"osId": [
101
]
}
]
}
}
]
}
}
The expected output case classes for Event
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","999795_100", Map("targetId"->"999795_100") )
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","987875_100", Map("targetId"->"987875_100") )
case class Event(
productId: String,
familyId: String,
productTypeId: String,
key: String,
productName: String,
deviceOS:String,
var featureMap: mutable.Map[String, String])
val finalDataset:Dataset[Event] = inputDataFrame.flatMap(
row=> {
val productId = row.getAs[String]("productId")
val userActions = row.getAs[Row]("userActions")
val userEvents:mutable.Seq[Row] = userActions.getAs[mutable.WrappedArray[Row]]("events")
val processedEvents:mutable.Seq[Row]= userEvents.map(
event=>
val productTypeId = event.getAs[Int]("productTypeId")
val familyId = event.getAs[String]("familyId")
val features = activity.getAs[mutable.WrappedArray[Row]]("features")
val serialId = activity.getAs[String]("serialId")
val key = productId ":" serialId
val features = mutable.Map[String, String]().withDefaultValue(null)
val device_os_list=List("999795_100","987875_101")
//Feature Map is for every device_os ( example "targetId"->"999795_100") for 999795_100
if (familyId == 2010 )
{
val a: Option[List[String]] = flatten the deviceId,osId ..
a.get.map(i=>{
val key: String = methodToCombinedeviceIdAndosId
val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey
Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning **List[Event]**
})
}
else{
Event(productId,productTypeId,familyId,key,productName,device_os,feature) --> This is returning **Event**. THIS WORKS
}
)
}
)
CodePudding user response:
I do not implement it fully the same but I think it will be possible to understand logic and apply it on your case.
I created json file like kafka.json and put there code like this(your event):
[{
"partition": 1,
"key": "34768_20220203_MFETP501",
"offset": 1841543,
"createTime": 1646041475348,
"topic": "topic_int",
"publishTime": 1646041475344,
"errorCode": 0,
"userActions": {
"productId": "3MFETP501",
"createdDate": "2022-02-26T11:19:35.786Z",
"events": [
{
"GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
"eventDate": "2022-02-26T11:19:35.786Z",
"familyId": 2010,
"productTypeId": 1004678,
"serialID": "890479804",
"productName": "MFE Total Protection 2021 Family Pack",
"features": {
"mapping": [
{
"deviceId": 999795,
"osId": [
100
]
},
{
"deviceId": 987875,
"osId": [
101
]
}
]
}
}
]
}
}]
Please find below first solution that is based on flatMap and for loop.
case class Event(
productId: String,
familyId: String,
productTypeId: String,
key: String,
productName: String,
deviceOS: String,
featureMap: Map[String, String])
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.mutable
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
import spark.implicits._
val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
row => {
val userActions = row.getAs[Row]("userActions")
val productId = userActions.getAs[String]("productId")
val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
for (event <- userEvents;
familyId = event.getAs[Int]("familyId").toString;
productTypeId = event.getAs[Int]("productTypeId").toString;
serialId = event.getAs[String]("serialID");
productName = event.getAs[String]("productName");
key = s"$productId:$serialId";
features = event.getAs[Row]("features");
mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
mappingRow <- mappings;
deviceId = mappingRow.getAs[Long]("deviceId");
osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
osId <- osIds;
deviseOs = deviceId "_" osId
) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> (deviseOs)))
}
)
finalDataset.foreach(e => println(e))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
Also, you can solve this task using select, withColumn, explode, concat functions.
case class Event(
productId: String,
familyId: String,
productTypeId: String,
key: String,
productName: String,
deviceOS: String,
featureMap: Map[String, String])
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions.{col, explode, concat, lit, map}
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
val transformedDataFrame = inputDataFrame
.select(col("userActions.productId").as("productId"),
explode(col("userActions.events")).as("event"))
.select(col("productId"),
col("event.familyId").as("familyId"),
col("event.productTypeId").as("productTypeId"),
col("event.serialID").as("serialID"),
col("event.productName").as("productName"),
explode(col("event.features.mapping")).as("features")
)
.select(
col("productId"),
col("familyId"),
col("productTypeId"),
col("serialID"),
col("productName"),
col("features.deviceId").as("deviceId"),
explode(col("features.osId")).as("osId")
)
.withColumn("key", concat(col("productId"), lit(":"), col("serialID")))
.withColumn("deviceOS", concat(col("deviceId"), lit("_"), col("osId")))
.withColumn("featureMap", map(lit("target"), col("deviceOS")))
import spark.implicits._
private val result: Dataset[Event] = transformedDataFrame.as[Event]
result.foreach(e => println(e))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
Add option to customize response based on the value one of the field. I replace here for comprehension to map/flatmap, so you can return as response one or several events based on the type. Also, I customized json a little bit to show more examples in the result.
New json:
[{
"partition": 1,
"key": "34768_20220203_MFETP501",
"offset": 1841543,
"createTime": 1646041475348,
"topic": "topic_int",
"publishTime": 1646041475344,
"errorCode": 0,
"userActions": {
"productId": "3MFETP501",
"createdDate": "2022-02-26T11:19:35.786Z",
"events": [
{
"GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
"eventDate": "2022-02-26T11:19:35.786Z",
"familyId": 2010,
"productTypeId": 1004678,
"serialID": "890479804",
"productName": "MFE Total Protection 2021 Family Pack",
"features": {
"mapping": [
{
"deviceId": 999795,
"osId": [
100,
110
]
},
{
"deviceId": 987875,
"osId": [
101
]
}
]
}
},
{
"GUID": "1111-2222-f7f0-44af-90da-80179412f89c",
"eventDate": "2022-03-26T11:19:35.786Z",
"familyId": 2011,
"productTypeId": 1004679,
"serialID": "890479805",
"productName": "Product name",
"features": {
"mapping": [
{
"deviceId": 999796,
"osId": [
103
]
},
{
"deviceId": 987877,
"osId": [
104
]
}
]
}
}
]
}
}]
Please find code below:
case class Event(
productId: String,
familyId: String,
productTypeId: String,
key: String,
productName: String,
deviceOS: String,
featureMap: Map[String, String])
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
import spark.implicits._
val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
row => {
val userActions = row.getAs[Row]("userActions")
val productId = userActions.getAs[String]("productId")
val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
for (event <- userEvents;
productTypeId = event.getAs[Int]("productTypeId").toString;
serialId = event.getAs[String]("serialID");
productName = event.getAs[String]("productName");
key = s"$productId:$serialId";
familyId = event.getAs[Int]("familyId").toString;
features = event.getAs[Row]("features");
mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
mappingRow <- mappings;
deviceId = mappingRow.getAs[Long]("deviceId");
osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
osId <- osIds;
deviseOs = deviceId "_" osId
) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> deviseOs))
userEvents.flatMap(event => {
val productTypeId = event.getAs[Int]("productTypeId").toString
val serialId = event.getAs[String]("serialID")
val productName = event.getAs[String]("productName")
val key = s"$productId:$serialId"
val familyId = event.getAs[Long]("familyId")
if(familyId == 2010) {
val features = event.getAs[Row]("features")
val mappings = features.getAs[mutable.WrappedArray[Row]]("mapping")
mappings.flatMap(mappingRow => {
val deviceId = mappingRow.getAs[Long]("deviceId")
val osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId")
osIds.map(osId => {
val devise_os = deviceId "_" osId
Event(productId, familyId.toString, productTypeId, key, productName, devise_os, Map("target" -> devise_os))
})
})
} else {
Seq(Event(productId, familyId.toString, productTypeId, key, productName, "default_defice_os", Map("target" -> "default_defice_os")))
}
})
}
)
finalDataset.foreach(e => println(e))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_110,Map(target -> 999795_110))
// Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
// Event(3MFETP501,2011,1004679,3MFETP501:890479805,Product name,default_defice_os,Map(target -> default_defice_os))
CodePudding user response:
As this is under a Row of DataFrame, returning Event case class , converts into DataSet.Issue here is for one condition ,i am getting List[Event] and rest type , i am getting only Event class
FYI :This is not an answer. But my further attempt to solve.
if (familyId == 2010 )
{
val a: Option[List[String]] = flatten the deviceId,osId ..
a.get.map(i=>{
val key: String = methodToCombinedeviceIdAndosId
val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey
Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning List[Event]
})
}
else{
Event(productId,productTypeId,familyId,key,productName,device_os,feature) --> This is returning Event
}