Home > database >  Scala - translate function from list-based to dataset-based
Scala - translate function from list-based to dataset-based

Time:08-11

I have a function that operates on a list, but it causes performance problems and I would like to make it completely dataset-based. Apparently, the dataset has the same methods as the list, so it should be possible to painlessly translate this method, but somehow I can't get to the compiling solution.

def findDuplicates(products: Dataset[AggregateWithSupplierProducts]): Dataset[DuplicatesByIds] = {
    val productsDatasetAsList = products.as[AggregateWithSupplierProducts].collect().toList
    val duplicates = productsDatasetAsList.groupBy(_.products.sortBy(_.product.productId)).filter(_._2.size > 1).values.toList
    mapToDataset(duplicates)
  } 

From the very beginning, returned from the groupBy RelationalGroupedDataset prevents me from conveniently translating the rest of the part.
Data structure:

case class AggregateWithSupplierProducts(
                                         id: String, 
                                         products: List[Product])
case class Product(productId: String, productCount: Int)
case class DuplicatesByIds(duplicates: List[String)

Example of data:

[  {
   "id": "ID1",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID2",
   "products": [
     {
       "product": {
         "productId": "SOME_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID3",
   "products": [
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID4",
   "products": [
     {
       "product": {
         "productId": "SOME_OTHER_ID",
         "productCount": 1
       }
     },
     {
       "product": {
         "productId": "DIFFERENT_ID",
         "productCount": 1
       }
     }
   ],
 },
 {
   "id": "ID5",
   "products": [
     {
       "product": {
         "productId": "NOT_DUPLICATED_ID",
         "productCount": 1
       }
     },
     {
       "product":
         "productId": "DIFFERENT_ID",
         "productCount": 2
       }
     }
   ],
 }
]

Result of this would be:

Dataset with 
DuplicatesByIds(List("ID1", "ID2")),
DuplicatesByIds(List("ID3", "ID4")) 

Code is working proper with dataset collected to list, but I have got considerable problem with translating it to working full on dataset, without wasting memory

CodePudding user response:

One can only speculate here on what your data is, so I assume it's like this:

case class AggregateWithSupplierProducts(productId: Int, a: Int, b: Int)
case class DuplicatesByIds(productId: Int, products: List[AggregateWithSupplierProducts])

With that, you can group and filter duplicates as follows:

val ds = Seq(
           AggregateWithSupplierProducts(1,14,4),
           AggregateWithSupplierProducts(1,9,3),
           AggregateWithSupplierProducts(2,5,6)
         ).toDS

ds.show
 --------- --- --- 
|productId|  a|  b|
 --------- --- --- 
|        1| 14|  4|
|        1|  9|  3|
|        2|  5|  6|
 --------- --- --- 


def findDuplicates(products: Dataset[AggregateWithSupplierProducts]) = 
  products.groupByKey(_.productId).mapGroups((key, values) => DuplicatesByIds(key, values.toList)).filter(_.products.size > 1)
  
findDuplicates(ds).show(false)
 --------- ----------------------- 
|productId|products               |
 --------- ----------------------- 
|1        |[{1, 14, 4}, {1, 9, 3}]|
 --------- ----------------------- 
  • Related