Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB
"my_dataframe" consists of the a column (s3ObjectName) with object names like:-
s3ObjectName |
---|
a1.json |
b2.json |
c3.json |
d4.json |
e5.json |
//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
Try {
return fn
} match {
case Success(x) => x
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1)(fn)
} else {
val loggerDf = Seq((t.toString)).toDF("Description")
.withColumn("Type", lit("Failure"))
.withColumn("id", uuid())
loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
throw t
}
}
}
}
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
val s3client = AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
partition.foreach(x => {
retry (2) {
val objectKey = x.getString(0)
val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
val inputS3String = IOUtils.toString(i, "UTF-8")
val filePath = s"${data_lake_file_path}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
}
})
})
When the above is executed it results in the following error:-
Caused by: java.lang.NullPointerException
This error occurs in the retry function when it is asked to create the dataframe loggerDf and write it to cosmos db
Is there another way to write the error messages to cosmos DB ?
CodePudding user response:
Maybe this isn't a good time to use spark. There is already some hadoop tooling to accomplish this type of S3 file transfer using hadoop that does what you are doing but uses hadoop tools.
If you still feel like spark is the correct tooling: Split this into a reporting problem and a data transfer problem. Create and test a list of the files to see if they're valid. Write a UDF that does the dirty work of creating a data frame of good/bad files. Report the files that aren't valid. (To Cosmos)
Transfer the files that are valid.
CodePudding user response:
If you want to write errors to cosmo DB you'll need to use an "out of band" method to initiate the connection from the executors.(Think: initiating a jdbc connection from inside the partition.foreach.)
As a lower standard, if you wanted to know if it happened you could use Accumulators. This isn't made for logging but does help transfer information from executors to the driver. This would enable you to write something back to Cosmos, but really was intended be used to simply count if something has happened. (And can double count if you end up retrying a executor, so it's not perfect.) It technically can transfer information back to the driver, but should only be used for countable things. (If this type of failure is extremely irregular it's likely suitable. If this happens a lot it's not suitable for use.)