Home > Net >  Multiplying event case class depending on the list based on nested IDs
Multiplying event case class depending on the list based on nested IDs


I am processing a dataframe and converting into Dataset[Event] using Event case class.How ever there are nested Ids for which i need to multiply the events based on the flattening of nested device:os.

I am able to return the case class Event at the Kafka event level. But not sure how to multiply events .

Kafka incoming Event:

  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
              "deviceId": 999795,
              "osId": [
              "deviceId": 987875
              "osId": [

The expected output case classes for Event

Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","999795_100", Map("targetId"->"999795_100") )
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","987875_100", Map("targetId"->"987875_100") )
case class Event(
                    productId: String,
                    familyId: String,
                    productTypeId: String,
                    key: String,
                    productName: String,
                    var featureMap: mutable.Map[String, String])

val finalDataset:Dataset[Event] = inputDataFrame.flatMap(
row=> {

        val productId = row.getAs[String]("productId")
        val userActions = row.getAs[Row]("userActions")
        val userEvents:mutable.Seq[Row] = userActions.getAs[mutable.WrappedArray[Row]]("events")

        val processedEvents:mutable.Seq[Row]= userEvents.map(

            val productTypeId = event.getAs[Int]("productTypeId")
            val familyId = event.getAs[String]("familyId")
            val features = activity.getAs[mutable.WrappedArray[Row]]("features")
            val serialId = activity.getAs[String]("serialId")
            val key =  productId ":" serialId
            val features = mutable.Map[String, String]().withDefaultValue(null)

            val device_os_list=List("999795_100","987875_101")
            //Feature Map is for every device_os ( example "targetId"->"999795_100") for 999795_100

      if (familyId == 2010 )
      val a: Option[List[String]] = flatten the deviceId,osId ..
          val key: String =  methodToCombinedeviceIdAndosId
          val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey

          Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning **List[Event]** 
    Event(productId,productTypeId,familyId,key,productName,device_os,feature)  --> This is returning **Event**. THIS WORKS




CodePudding user response:

I do not implement it fully the same but I think it will be possible to understand logic and apply it on your case.

I created json file like kafka.json and put there code like this(your event):

  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
              "deviceId": 999795,
              "osId": [
              "deviceId": 987875,
              "osId": [

Please find below first solution that is based on flatMap and for loop.

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, Row, SparkSession}

  import scala.collection.mutable

  val spark = SparkSession

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")

  import spark.implicits._

  val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
    row => {

      val userActions = row.getAs[Row]("userActions")
      val productId = userActions.getAs[String]("productId")

      val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
      for (event <- userEvents;
           familyId = event.getAs[Int]("familyId").toString;
           productTypeId = event.getAs[Int]("productTypeId").toString;
           serialId = event.getAs[String]("serialID");
           productName = event.getAs[String]("productName");
           key = s"$productId:$serialId";
           features = event.getAs[Row]("features");
           mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
           mappingRow <- mappings;
           deviceId = mappingRow.getAs[Long]("deviceId");
           osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
           osId <- osIds;
           deviseOs = deviceId   "_"   osId
           ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> (deviseOs)))


  finalDataset.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))

Also, you can solve this task using select, withColumn, explode, concat functions.

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, SparkSession}
  import org.apache.spark.sql.functions.{col, explode, concat, lit, map}

  val spark = SparkSession

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")

  val transformedDataFrame = inputDataFrame
      .withColumn("key", concat(col("productId"), lit(":"), col("serialID")))
      .withColumn("deviceOS", concat(col("deviceId"), lit("_"), col("osId")))
      .withColumn("featureMap", map(lit("target"), col("deviceOS")))

  import spark.implicits._

  private val result: Dataset[Event] = transformedDataFrame.as[Event]
  result.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))

Add option to customize response based on the value one of the field. I replace here for comprehension to map/flatmap, so you can return as response one or several events based on the type. Also, I customized json a little bit to show more examples in the result.

New json:

  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
              "deviceId": 999795,
              "osId": [
              "deviceId": 987875,
              "osId": [
        "GUID": "1111-2222-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-03-26T11:19:35.786Z",
        "familyId": 2011,
        "productTypeId": 1004679,
        "serialID": "890479805",
        "productName": "Product name",
        "features": {
          "mapping": [
              "deviceId": 999796,
              "osId": [
              "deviceId": 987877,
              "osId": [

Please find code below:

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, SparkSession}

  val spark = SparkSession

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")

  import spark.implicits._

  val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
    row => {

      val userActions = row.getAs[Row]("userActions")
      val productId = userActions.getAs[String]("productId")

      val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
      for (event <- userEvents;
           productTypeId = event.getAs[Int]("productTypeId").toString;
           serialId = event.getAs[String]("serialID");
           productName = event.getAs[String]("productName");
           key = s"$productId:$serialId";
           familyId = event.getAs[Int]("familyId").toString;
           features = event.getAs[Row]("features");
           mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
           mappingRow <- mappings;
           deviceId = mappingRow.getAs[Long]("deviceId");
           osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
           osId <- osIds;
           deviseOs = deviceId   "_"   osId
           ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> deviseOs))

      userEvents.flatMap(event => {
        val productTypeId = event.getAs[Int]("productTypeId").toString
        val serialId = event.getAs[String]("serialID")
        val productName = event.getAs[String]("productName")
        val key = s"$productId:$serialId"
        val familyId = event.getAs[Long]("familyId")

        if(familyId == 2010) {
          val features = event.getAs[Row]("features")
          val mappings = features.getAs[mutable.WrappedArray[Row]]("mapping")
          mappings.flatMap(mappingRow => {
            val deviceId = mappingRow.getAs[Long]("deviceId")
            val osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId")
            osIds.map(osId => {
              val devise_os = deviceId   "_"   osId
              Event(productId, familyId.toString, productTypeId, key, productName, devise_os, Map("target" -> devise_os))
        } else {
          Seq(Event(productId, familyId.toString, productTypeId, key, productName, "default_defice_os", Map("target" -> "default_defice_os")))


  finalDataset.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_110,Map(target -> 999795_110))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
//  Event(3MFETP501,2011,1004679,3MFETP501:890479805,Product name,default_defice_os,Map(target -> default_defice_os))

CodePudding user response:

As this is under a Row of DataFrame, returning Event case class , converts into DataSet.Issue here is for one condition ,i am getting List[Event] and rest type , i am getting only Event class

FYI :This is not an answer. But my further attempt to solve.

if (familyId == 2010 )
      val a: Option[List[String]] = flatten the deviceId,osId ..
          val key: String =  methodToCombinedeviceIdAndosId
          val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey

          Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning List[Event]
    Event(productId,productTypeId,familyId,key,productName,device_os,feature)  --> This is returning Event

  • Related