Home > Enterprise >  Spark Error:- "value foreach is not a member of Object"
Spark Error:- "value foreach is not a member of Object"

Time:09-17

The dataframe consists of the two columns (s3ObjectName, batchName) with tens of thousands of rows like:-

s3ObjectName batchName
a1.json 45
b2.json 45
c3.json 45
d4.json 46
e5.json 46

The objective is to retrieve objects from an S3 bucket and write to datalake in parallel using details from each row in the dataframe using foreachPartition() and foreach() functions

  // s3 connector details defined as an object so it can be serialized and available on all executors in the cluster

object container {
  
  def getDataSource() = {
    val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
    val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    val clientRegion: Regions = Regions.US_EAST_1
    AmazonS3ClientBuilder.standard()
    .withRegion(clientRegion)
    .withCredentials(new AWSStaticCredentialsProvider(creds))
    .build()
    
  }
}

dataframe.foreachPartition(partition => {
      //Initialize s3 connection for each partition
      val client: AmazonS3 = container.getDataSource()
      partition.foreach(row => {
        val s3ObjectName = row.getString(0)
        val batchname = row.getString(1)
        val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
        val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
        val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
        val file = new File(filePath)
        val fileWriter = new FileWriter(file)
        val bw = new BufferedWriter(fileWriter)
        bw.write(inputS3String)
        bw.close()
        fileWriter.close()
        })
      })  

The above process gives me

Error: value foreach is not a member of Object

CodePudding user response:

Convert Dataframe to RDD before calling foreachPartition.

dataframe.rdd.foreachPartition(partition => {

})
  • Related