How can I make java.util.function.Predicate Serializable? Is not a class in my application, and I didn't use it everywhere, how can I fix this issue?
here is the stack
App > Serialization stack:
App > - object not serializable (class: java.util.function.Predicate$$Lambda$309/931003277, value: java.util.function.Predicate$$Lambda$309/931003277@5c97826a)
App > - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
App > - object (class io.github.resilience4j.retry.RetryConfig, io.github.resilience4j.retry.RetryConfig@20496d9f)
App > - field (class: determination.resilience.GetTaxResilience, name: retryConfig, type: class io.github.resilience4j.retry.RetryConfig)
App > - object (class determination.resilience.GetTaxResilience, determination.resilience.GetTaxResilience@af49e79)
App > - field (class: vertex.VertexTaxComplianceOperator, name: getTaxResilience, type: class determination.resilience.GetTaxResilience)
App > - object (class vertex.VertexTaxComplianceOperator, vertex.VertexTaxComplianceOperator@3e68a323)
App > - field (class: vertex.VertexTaxComplianceOperator$$anonfun$5, name: $outer, type: class vertex.VertexTaxComplianceOperator)
App > - object (class vertex.VertexTaxComplianceOperator$$anonfun$5, <function1>)
App > - field (class: org.apache.spark.sql.execution.MapElementsExec, name: func, type: class java.lang.Object)
App > - object (class org.apache.spark.sql.execution.MapElementsExec, MapElements <function1>, obj#512:
I'm so confused, this is something belong to another jar, I just use it, why I got Serialization error at this line.
App > - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
here is my code
@Component
class GetTaxResilience extends Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
val retryConfig = RetryConfig.custom()
.maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
.retryExceptions(classOf[StatusRuntimeException])
.intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
.build
val retryRegistry = RetryRegistry.of(retryConfig)
val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)
def getDefaultRetryInstance(): Retry = {
// default retry instance configured in yaml
logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
retryInstance
}
}
Edit:
@Autowired
var getTaxResilience: GetTaxResilience = null
withRetry(getTaxResilience.getDefaultRetryInstance())
and also the error where happened is already Serializable..
public class RetryConfig implements Serializable
Edit2
val calculateResultDF = preparedDataSetDF.mapPartitions(iterator => {
val calculateResultDF = iterator.map(row => {
//retry stuff
)
calculateResultDF
}).toDF()
spark.sql(sqlBuilder.createStagingTableInsertIntoSQL(jobInfoCase.run_id)).toDF()
insertIntoExceptionArchiveTable(spark, jobInfoCase.run_id)
CodePudding user response:
Spark is trying to serialize the retryConfig
field of your object. The object in the retryConfig
field holds a reference to something that's not serializable and which you can't make serializable.
One solution is to mark retryConfig
@transient
. This will prevent it from being serialized (on the receiving end, it will be null
). Other val
s which depend on a @transient
field (in this case retryRegistry
and, transitively, retryInstance
) should probably also be marked @transient
.
You would then define a readResolve
method, which will get called on the receiving end. This method would return a new GetTaxResilience
, which will have appropriate values for retryConfig
/retryRegistry
/retryInstance
.
class GetTaxResilience extends Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
@transient val retryConfig = RetryConfig.custom()
.maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
.retryExceptions(classOf[StatusRuntimeException])
.intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
.build
@transient val retryRegistry = RetryRegistry.of(retryConfig)
@transient val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)
def getDefaultRetryInstance(): Retry = {
// default retry instance configured in yaml
logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
retryInstance
}
private def readResolve: AnyRef = new GetTaxResilience
}
In this particular case, since GetTaxResilience
takes no constructor parameters, it likely makes sense to make it an object
, in which case it doesn't have to be made Serializable
or have @transient
s or a readResolve
.
object GetTaxResilience {
private val logger = LoggerFactory.getLogger(this.getClass)
val retryConfig = RetryConfig.custom()
.maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
.retryExceptions(classOf[StatusRuntimeException])
.intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
.build
val retryRegistry = RetryRegistry.of(retryConfig)
val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)
def getDefaultRetryInstance(): Retry = {
// default retry instance configured in yaml
logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
retryInstance
}
}
Then in Scala code to access the default retry instance, you just need:
GetTaxResilience.getDefaultRetryInstance()
(though the more Scala-idiomatic way would just be GetTaxResilience.retryInstance
)