Home > Net >  Caused by: java.io.NotSerializableException: java.util.function.Predicate$$Lambda
Caused by: java.io.NotSerializableException: java.util.function.Predicate$$Lambda

Time:09-16

How can I make java.util.function.Predicate Serializable? Is not a class in my application, and I didn't use it everywhere, how can I fix this issue?

here is the stack

App > Serialization stack:
App >   - object not serializable (class: java.util.function.Predicate$$Lambda$309/931003277, value: java.util.function.Predicate$$Lambda$309/931003277@5c97826a)
App >   - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
App >   - object (class io.github.resilience4j.retry.RetryConfig, io.github.resilience4j.retry.RetryConfig@20496d9f)
App >   - field (class: determination.resilience.GetTaxResilience, name: retryConfig, type: class io.github.resilience4j.retry.RetryConfig)
App >   - object (class determination.resilience.GetTaxResilience, determination.resilience.GetTaxResilience@af49e79)
App >   - field (class: vertex.VertexTaxComplianceOperator, name: getTaxResilience, type: class determination.resilience.GetTaxResilience)
App >   - object (class vertex.VertexTaxComplianceOperator, vertex.VertexTaxComplianceOperator@3e68a323)
App >   - field (class: vertex.VertexTaxComplianceOperator$$anonfun$5, name: $outer, type: class vertex.VertexTaxComplianceOperator)
App >   - object (class vertex.VertexTaxComplianceOperator$$anonfun$5, <function1>)
App >   - field (class: org.apache.spark.sql.execution.MapElementsExec, name: func, type: class java.lang.Object)
App >   - object (class org.apache.spark.sql.execution.MapElementsExec, MapElements <function1>, obj#512: 

I'm so confused, this is something belong to another jar, I just use it, why I got Serialization error at this line.

App >   - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)

here is my code

@Component
class GetTaxResilience extends Serializable  {
    private val logger = LoggerFactory.getLogger(this.getClass)

    val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    val retryRegistry = RetryRegistry.of(retryConfig)

    val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }
}

Edit:

@Autowired
var getTaxResilience: GetTaxResilience = null

withRetry(getTaxResilience.getDefaultRetryInstance())

and also the error where happened is already Serializable..

public class RetryConfig implements Serializable

Edit2

 val calculateResultDF = preparedDataSetDF.mapPartitions(iterator => {

            val calculateResultDF = iterator.map(row => {
    //retry stuff
                )
            calculateResultDF
        }).toDF()


spark.sql(sqlBuilder.createStagingTableInsertIntoSQL(jobInfoCase.run_id)).toDF()

        insertIntoExceptionArchiveTable(spark, jobInfoCase.run_id)

CodePudding user response:

Spark is trying to serialize the retryConfig field of your object. The object in the retryConfig field holds a reference to something that's not serializable and which you can't make serializable.

One solution is to mark retryConfig @transient. This will prevent it from being serialized (on the receiving end, it will be null). Other vals which depend on a @transient field (in this case retryRegistry and, transitively, retryInstance) should probably also be marked @transient.

You would then define a readResolve method, which will get called on the receiving end. This method would return a new GetTaxResilience, which will have appropriate values for retryConfig/retryRegistry/retryInstance.

class GetTaxResilience extends Serializable  {
    private val logger = LoggerFactory.getLogger(this.getClass)

    @transient val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    @transient val retryRegistry = RetryRegistry.of(retryConfig)

    @transient val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }

  private def readResolve: AnyRef = new GetTaxResilience
}

In this particular case, since GetTaxResilience takes no constructor parameters, it likely makes sense to make it an object, in which case it doesn't have to be made Serializable or have @transients or a readResolve.

object GetTaxResilience {
    private val logger = LoggerFactory.getLogger(this.getClass)

    val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    val retryRegistry = RetryRegistry.of(retryConfig)

    val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }
}

Then in Scala code to access the default retry instance, you just need:

GetTaxResilience.getDefaultRetryInstance()

(though the more Scala-idiomatic way would just be GetTaxResilience.retryInstance)

  • Related