I have a function that operates on a list, but it causes performance problems and I would like to make it completely dataset-based. Apparently, the dataset has the same methods as the list, so it should be possible to painlessly translate this method, but somehow I can't get to the compiling solution.
def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
mapToDataset(duplicates)
}
From the very beginning, returned from the groupBy RelationalGroupedDataset prevents me from conveniently translating the rest of the part.
Data structure:
case class AggregateWithSupplierProducts(
id: String,
products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)
Example of data:
[ {
"id": "ID1",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID2",
"products": [
{
"product": {
"productId": "SOME_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID3",
"products": [
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
},
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
}
],
},
{
"id": "ID4",
"products": [
{
"product": {
"productId": "SOME_OTHER_ID",
"productCount": 1
}
},
{
"product": {
"productId": "DIFFERENT_ID",
"productCount": 1
}
}
],
},
{
"id": "ID5",
"products": [
{
"product": {
"productId": "NOT_DUPLICATED_ID",
"productCount": 1
}
},
{
"product":
"productId": "DIFFERENT_ID",
"productCount": 2
}
}
],
}
]
Result of this would be:
Dataset with
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4"))
Code is working proper with dataset collected to list, but I have got considerable problem with translating it to working full on dataset, without wasting memory
CodePudding user response:
One can only speculate here on what your data is, so I assume it's like this:
case class AggregateWithSupplierProducts(productId: Int, a: Int, b: Int)
case class DuplicatesByIds(productId: Int, products: List[AggregateWithSupplierProducts])
With that, you can group and filter duplicates as follows:
val ds = Seq(
AggregateWithSupplierProducts(1,14,4),
AggregateWithSupplierProducts(1,9,3),
AggregateWithSupplierProducts(2,5,6)
).toDS
ds.show
--------- --- ---
|productId| a| b|
--------- --- ---
| 1| 14| 4|
| 1| 9| 3|
| 2| 5| 6|
--------- --- ---
def findDuplicates(products: Dataset[AggregateWithSupplierProducts]) =
products.groupByKey(_.productId).mapGroups((key, values) => DuplicatesByIds(key, values.toList)).filter(_.products.size > 1)
findDuplicates(ds).show(false)
--------- -----------------------
|productId|products |
--------- -----------------------
|1 |[{1, 14, 4}, {1, 9, 3}]|
--------- -----------------------