Home > Blockchain >  Join dataset with case class spark scala
Join dataset with case class spark scala

Time:11-20

I am converting a dataframe into a dataset using case class which has a sequence of another case class

case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
    ip: String,
    ipVersion: Byte,
    ipType: String,
    city: String,
    state: String,
    country: String)

Now I have another dataset of strings that has just IPs. My requirement is to get all records from IpLocation if ipType == "home" or IP dataset has the given IP from ipLocation. I am trying to use bloom filter on the IP dataset to search through that dataset but it is inefficient and not working that well in general. I want to join the IP dataset with IpLocation but I'm having trouble since this is in a Seq. I'm very new to spark and scala so I'm probably missing something. Right now my code looks like this


def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
    val count = Ips.count
    val bloomFilter = Ips.rdd
      .mapPartitions { iter =>
        val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
        iter.foreach(i => b  = i)
        Iterator(b)
      }
      .treeReduce(_|_)
    bloomFilter
  }

val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)

idMonitor.map { x => 
    x.ipLocation.filter(
       x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
    )
}

I just want to figure out how to join IpLocation and Ips

CodePudding user response:

You can explode your array sequence in your IpMonitor objects using explode function and then use an inner join to filter out ips using your Ips dataset and finally rebuild your IpLocation sequence by grouping by id and collect_list.

Complete code is as follows:

import org.apache.spark.sql.functions.{col, collect_list, explode}

val result = idMonitor.select(col("id"), explode(col("ipLocation")))
  .filter(col("col.ipType") === "home")
  .join(Ips, col("col.ip") === col("value"))
  .groupBy("id")
  .agg(collect_list("col").as("value"))
  .drop("id")
  .as[Seq[IpLocation]]

CodePudding user response:

Sample:

Starting from your case class,

case class IpLocation(
    ip: String,
    ipVersion: Byte,
    ipType: String,
    city: String,
    state: String,
    country: String
)
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])

I have defined the sample data as follows:

val ip_locations1 = Seq(IpLocation("123.123.123.123", 12.toByte, "home", "test", "test", "test"), IpLocation("123.123.123.124", 12.toByte, "otherwise", "test", "test", "test"))
val ip_locations2 = Seq(IpLocation("123.123.123.125", 13.toByte, "company", "test", "test", "test"), IpLocation("123.123.123.124", 13.toByte, "otherwise", "test", "test", "test"))

val id_monitor = Seq(IdMonitor("1", ip_locations1), IdMonitor("2", ip_locations2))
val df = id_monitor.toDF()
df.show(false)

 --- ------------------------------------------------------------------------------------------------------ 
|id |ipLocation                                                                                            |
 --- ------------------------------------------------------------------------------------------------------ 
|1  |[{123.123.123.123, 12, home, test, test, test}, {123.123.123.124, 12, otherwise, test, test, test}]   |
|2  |[{123.123.123.125, 13, company, test, test, test}, {123.123.123.124, 13, otherwise, test, test, test}]|
 --- ------------------------------------------------------------------------------------------------------ 

and the IPs:

val ips = Seq("123.123.123.125")
val df_ips = ips.toDF("ips")
df_ips.show()

 --------------- 
|            ips|
 --------------- 
|123.123.123.125|
 --------------- 

Join:

From the above example data, explode the array of the IdMonitor and join with the IPs.

df.withColumn("ipLocation", explode('ipLocation)).alias("a")
  .join(df_ips.alias("b"), col("a.ipLocation.ipType") === lit("home") || col("a.ipLocation.ip") === col("b.ips"), "inner")
  .select("ipLocation.*")
  .as[IpLocation].collect()

Finally, the collected result is given as follows:

res32: Array[IpLocation] = Array(IpLocation(123.123.123.123,12,home,test,test,test), IpLocation(123.123.123.125,13,company,test,test,test))
  • Related