Home > OS >  Serialization Exception in Apache Spark and Java
Serialization Exception in Apache Spark and Java

Time:07-18

I had a task for creating a POC for Apache Spark w/Springboot. I created a controller for getting my data through API:

@PostMapping(path = "/memberData")
public Map<String, Profile> processData(@RequestBody Member member) {
    logger.info("Processing data for member List: {}", member);
    return service.processData(member);
}

It should return a Map having profile.getName() as its Key and Profile object as its value, both of which are part of Member object. I then implemented the service for the controller:

@Autowired
JavaSparkContext sc;
...
public Map<String, Profile> processData(Member member) {
    logger.info("Processing data for member List: {}", member);
    JavaRDD<Profile> profile = sc.parallelize(Collections.singletonList(member.getProfile()), 3);
    return profile.mapToPair(p -> new Tuple2<>(p.getName(), p)).collectAsMap();
}

The JavaSparkContext was declared in a config class in a different sub-package:

@Bean
public SparkConf conf() {
    return new SparkConf().setAppName(appName).setMaster(masterUri);
}
@Bean
public JavaSparkContext sc() {
    return new JavaSparkContext(conf());
}

The application builds perfectly but when I try to hit the url with my data, i.e. a Member object:

{
    "id": "62d121c30cc723640fd03cc2",
    "email": "[email protected]",
    "username": "allison88",
    "profile": {
      "name": "Allison Mcleod",
      "company": "Ziore",
      "dob": "1988-06-19",
      "address": "71 Glendale Court, Hobucken, Pennsylvania",
      "about": "Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est."
    },
    "roles": [
      "guest",
      "owner"
    ],
    "createdAt": "2020-07-20T20:40:51.840Z",
    "updatedAt": "2020-07-21T20:40:51.840Z"
  }

I am getting the following error that says something about the object being not serializable:

2022-07-18 09:52:22.887  INFO 13108 --- [nio-8080-exec-2] com.leofierus.service.WordCountService   : Processing data for member List: Member(id=62d121c30cc723640fd03cc2, [email protected], username=allison88, profile=Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.), roles=[guest, owner], createdAt=2020-07-21 02:10:51.84, updatedAt=2020-07-22 02:10:51.84)
2022-07-18 09:52:23.416  INFO 13108 --- [nio-8080-exec-2] org.apache.spark.SparkContext            : Starting job: count at WordCountService.java:33
2022-07-18 09:52:23.455  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Got job 0 (count at WordCountService.java:33) with 3 output partitions
2022-07-18 09:52:23.455  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Final stage: ResultStage 0 (count at WordCountService.java:33)
2022-07-18 09:52:23.455  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Parents of final stage: List()
2022-07-18 09:52:23.463  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Missing parents: List()
2022-07-18 09:52:23.471  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at WordCountService.java:32), which has no missing parents
2022-07-18 09:52:23.631  INFO 13108 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore     : Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 2.1 GB)
2022-07-18 09:52:23.730  INFO 13108 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore     : Block broadcast_0_piece0 stored as bytes in memory (estimated size 1342.0 B, free 2.1 GB)
2022-07-18 09:52:23.730  INFO 13108 --- [er-event-loop-0] o.apache.spark.storage.BlockManagerInfo  : Added broadcast_0_piece0 in memory on MalharP:54693 (size: 1342.0 B, free: 2.1 GB)
2022-07-18 09:52:23.730  INFO 13108 --- [uler-event-loop] org.apache.spark.SparkContext            : Created broadcast 0 from broadcast at DAGScheduler.scala:1161
2022-07-18 09:52:23.762  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Submitting 3 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] at parallelize at WordCountService.java:32) (first 15 tasks are for partitions Vector(0, 1, 2))
2022-07-18 09:52:23.770  INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl    : Adding task set 0.0 with 3 tasks
2022-07-18 09:52:24.098  INFO 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager  : Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7347 bytes)
2022-07-18 09:52:24.146  INFO 13108 --- [rker for task 0] org.apache.spark.executor.Executor       : Running task 0.0 in stage 0.0 (TID 0)
2022-07-18 09:52:24.274  INFO 13108 --- [rker for task 0] org.apache.spark.executor.Executor       : Finished task 0.0 in stage 0.0 (TID 0). 709 bytes result sent to driver
2022-07-18 09:52:24.282  INFO 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager  : Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7347 bytes)
2022-07-18 09:52:24.282  INFO 13108 --- [rker for task 1] org.apache.spark.executor.Executor       : Running task 1.0 in stage 0.0 (TID 1)
2022-07-18 09:52:24.299  INFO 13108 --- [rker for task 1] org.apache.spark.executor.Executor       : Finished task 1.0 in stage 0.0 (TID 1). 623 bytes result sent to driver
2022-07-18 09:52:24.306 ERROR 13108 --- [er-event-loop-1] org.apache.spark.util.Utils              : Exception encountered

java.io.NotSerializableException: com.leofierus.model.Profile
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:442) ~[na:na]
    at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$writeObject$1(ParallelCollectionRDD.scala:59) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) ~[na:na]
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-07-18 09:52:24.314 ERROR 13108 --- [er-event-loop-1] org.apache.spark.util.Utils              : Exception encountered

java.io.NotSerializableException: com.leofierus.model.Profile
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
    at java.base/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:442) ~[na:na]
    at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$writeObject$1(ParallelCollectionRDD.scala:59) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) ~[na:na]
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-07-18 09:52:24.314 ERROR 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager  : Failed to serialize task 2, not attempting to retry it.

java.io.NotSerializableException: com.leofierus.model.Profile
Serialization stack:
    - object not serializable (class: com.leofierus.model.Profile, value: Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.))
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.)))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@693)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 2))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-07-18 09:52:24.322  INFO 13108 --- [er-event-loop-1] o.a.spark.scheduler.TaskSchedulerImpl    : Removed TaskSet 0.0, whose tasks have all completed, from pool 
2022-07-18 09:52:24.322 ERROR 13108 --- [er-event-loop-1] o.a.spark.scheduler.TaskSchedulerImpl    : Resource offer failed, task set TaskSet_0.0 was not serializable
2022-07-18 09:52:24.322  INFO 13108 --- [result-getter-1] o.apache.spark.scheduler.TaskSetManager  : Finished task 1.0 in stage 0.0 (TID 1) in 40 ms on localhost (executor driver) (1/3)
2022-07-18 09:52:24.330  INFO 13108 --- [result-getter-1] o.a.spark.scheduler.TaskSchedulerImpl    : Removed TaskSet 0.0, whose tasks have all completed, from pool 
2022-07-18 09:52:24.330  INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl    : Cancelling stage 0
2022-07-18 09:52:24.330  INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl    : Killing all running tasks in stage 0: Stage cancelled
2022-07-18 09:52:24.330  INFO 13108 --- [result-getter-0] o.apache.spark.scheduler.TaskSetManager  : Finished task 0.0 in stage 0.0 (TID 0) in 469 ms on localhost (executor driver) (2/3)
2022-07-18 09:52:24.330  INFO 13108 --- [result-getter-0] o.a.spark.scheduler.TaskSchedulerImpl    : Removed TaskSet 0.0, whose tasks have all completed, from pool 
2022-07-18 09:52:24.330  INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : ResultStage 0 (count at WordCountService.java:33) failed in 0.835 s due to Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: com.leofierus.model.Profile

Can someone help me with it? I am very new to Apache Spark and trying to build a POC to see if it fits with the current project requirements that would need thousands of such data objects to be parsed at once.

CodePudding user response:

Please add implements Serializable in the classes that you are serializing. Here, Profile is a sub-object for the Member, so you need to Serialize both classes to make your code executable. I am illustrating this for the member class below:

public Member implements Serializable{
    ...
}

This will allow you to serialize your object's state to a byte stream so that the byte stream can be reverted back into a copy of the object.

  • Related