I am converting a dataframe into a dataset using case class which has a sequence of another case class
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
ip: String,
ipVersion: Byte,
ipType: String,
city: String,
state: String,
country: String)
Now I have another dataset of strings that has just IPs. My requirement is to get all records from IpLocation
if ipType
== "home" or IP dataset has the given IP from ipLocation
. I am trying to use bloom filter on the IP dataset to search through that dataset but it is inefficient and not working that well in general. I want to join the IP dataset with IpLocation
but I'm having trouble since this is in a Seq. I'm very new to spark and scala so I'm probably missing something. Right now my code looks like this
def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
val count = Ips.count
val bloomFilter = Ips.rdd
.mapPartitions { iter =>
val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
iter.foreach(i => b = i)
Iterator(b)
}
.treeReduce(_|_)
bloomFilter
}
val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)
idMonitor.map { x =>
x.ipLocation.filter(
x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
)
}
I just want to figure out how to join IpLocation
and Ips
CodePudding user response:
You can explode your array sequence in your IpMonitor
objects using explode
function and then use an inner join to filter out ips using your Ips
dataset and finally rebuild your IpLocation
sequence by grouping by id
and collect_list
.
Complete code is as follows:
import org.apache.spark.sql.functions.{col, collect_list, explode}
val result = idMonitor.select(col("id"), explode(col("ipLocation")))
.filter(col("col.ipType") === "home")
.join(Ips, col("col.ip") === col("value"))
.groupBy("id")
.agg(collect_list("col").as("value"))
.drop("id")
.as[Seq[IpLocation]]
CodePudding user response:
Sample:
Starting from your case class,
case class IpLocation(
ip: String,
ipVersion: Byte,
ipType: String,
city: String,
state: String,
country: String
)
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
I have defined the sample data as follows:
val ip_locations1 = Seq(IpLocation("123.123.123.123", 12.toByte, "home", "test", "test", "test"), IpLocation("123.123.123.124", 12.toByte, "otherwise", "test", "test", "test"))
val ip_locations2 = Seq(IpLocation("123.123.123.125", 13.toByte, "company", "test", "test", "test"), IpLocation("123.123.123.124", 13.toByte, "otherwise", "test", "test", "test"))
val id_monitor = Seq(IdMonitor("1", ip_locations1), IdMonitor("2", ip_locations2))
val df = id_monitor.toDF()
df.show(false)
--- ------------------------------------------------------------------------------------------------------
|id |ipLocation |
--- ------------------------------------------------------------------------------------------------------
|1 |[{123.123.123.123, 12, home, test, test, test}, {123.123.123.124, 12, otherwise, test, test, test}] |
|2 |[{123.123.123.125, 13, company, test, test, test}, {123.123.123.124, 13, otherwise, test, test, test}]|
--- ------------------------------------------------------------------------------------------------------
and the IPs:
val ips = Seq("123.123.123.125")
val df_ips = ips.toDF("ips")
df_ips.show()
---------------
| ips|
---------------
|123.123.123.125|
---------------
Join:
From the above example data, explode the array of the IdMonitor
and join with the IPs
.
df.withColumn("ipLocation", explode('ipLocation)).alias("a")
.join(df_ips.alias("b"), col("a.ipLocation.ipType") === lit("home") || col("a.ipLocation.ip") === col("b.ips"), "inner")
.select("ipLocation.*")
.as[IpLocation].collect()
Finally, the collected result is given as follows:
res32: Array[IpLocation] = Array(IpLocation(123.123.123.123,12,home,test,test,test), IpLocation(123.123.123.125,13,company,test,test,test))