I am using Apache Beam and writing the data to BigQuery. My Pipeline works fine locally inside intellij using Direct runner and I am able to write to BigQuery table locally inside intellij. However, I am getting exception "java.lang.IllegalArgumentException: Invalid lambda deserialization" as soon as I deploy the code on Spark Cluster.
User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:712)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:432)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:409)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.somecompany.beam.BeamApplication.run(BeamApplication.java:43)
at com.somecompany.SparkApp.main(L1LoaderSparkApp.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 52 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer.$deserializeLambda$(ErrorContainer.java:33)
... 62 more
My writer class looks like this
public class MyTermsBigQueryWriter implements Serializable {
@Qualifier("bigQueryProperties")
private BigQueryProperties bigQueryProperties;
public BigQueryIO.Write<MyTerms> myTermsWriter() {
final BigQueryIO.Write<MyTerms> myTermsWrite = BigQueryIO.<MyTerms>write()
.withMethod(STREAMING_INSERTS)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withJsonSchema(getMyTermsSchemaFile())
.withFormatFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms kt) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(kt);
}
})
.to(getTableSpec())
.withFormatRecordOnFailureFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms myTerms) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(myTerms);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
return myTermsWrite;
}
}
public class MyTerms implements Serializable {
public String xx;
public String xy;
// Has Equal and HashCode methods
}
If I don't use my writer, it works fine on cluster.
I have tried replacing lambda with method reference, anonymous inner class but no luck.
Any idea?
CodePudding user response:
It turns out that org.apache.avro.generic.GenericData.Record
class was causing serialization issues. We removed it to use simple pojo and everything started working fine.