Home > Software engineering >  Store POJOs to a Spark Dataset
Store POJOs to a Spark Dataset

Time:12-05

I'm connecting with Zeppelin to my rest web-service, that returns cities, through OpenAPI:

import java.util.Map;

import fr.ecoemploi.application.etude.swagger.model.Commune;
import fr.ecoemploi.application.etude.swagger.api.CogControllerApi;

var serviceCodeOfficielGeographique: CogControllerApi = new CogControllerApi();

var communes: Map[String, Commune] = 
    serviceCodeOfficielGeographique.obtenirCommunesUsingGET(2022);

it returns satisfactory results:

communes: java.util.Map[String,fr.ecoemploi.application.etude.swagger.model.Commune] =
{62001=class Commune {
    arrondissement: 627
    codeCanton: 6217
    codeCommune: 62001
    codeCommuneParente: null
    codeDepartement: 62
    codeEPCI: 246200364
    codeRegion: 32
    collectiviteOutremer: false
    nomCommune: Ablain-Saint-Nazaire
    nomMajuscules: ABLAIN SAINT NAZAIRE
    population: null
    sirenCommune: 216200014
 ...

I'd like to store the cities (the values of this map, that I'll catch with a communes.values()) to a Dataset[Commune].

But I wonder how to do this.
I've attempted:

import org.apache.spark.sql._

var datasetCommunes = spark.createDataset(communes, Encoders[Commune]);

under Zeppelin, but received the message:

<console>:96: error: object Encoders does not take type parameters.
       var datasetCommunes = spark.createDataset(communes, Encoders[Commune]);

I don't now how to write my statement.


Thanks for your advice below, @Gael_J.
I've attempted it, that way (because map.values() returns, in Java, a Collection<Commune> that I have first to convert to a java.util.List:

import org.apache.spark.sql._
import scala.collection.mutable._;
import spark.implicits._

var communeList: ArrayBuffer[Commune] = new ArrayBuffer[Commune];
communes.values().stream().forEach(commune => communeList.append(commune));

var datasetCommunes = communeList.toSeq.toDS();

But I'm receiving the error:

error: value toDS is not a member of Seq[fr.ecoemploi.application.etude.swagger.model.Commune]
       var datasetCommunes = communeList.toSeq.toDS();

CodePudding user response:

If Commune is a case class, you should be able to benefit from automatic Encoders with something like this:

import spark.implicits._

val ds = communes.values().toSeq().toDS()

CodePudding user response:

I'll add to @GaëlJ's answer that if Commune is a POJO then it should be a bean and you should manually define its encoder

import spark.implicits._

implicit val enc = Encoders.bean(classOf[Commune])

Seq(new Commune(...)).toDS()

Spark Scala Datasets using Java Classes

  • Related