Home > Enterprise >  Conversion issue for Spark dataframe to pandas
Conversion issue for Spark dataframe to pandas

Time:11-13

I am trying to convert a spark data frame to pandas and there is a error I am encountering with:

    databricks/spark/python/pyspark/sql/pandas/conversion.py:145: UserWarning: toPandas attempted
    Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has 
    reached the error below and can not continue. Note that 
    'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in 
     the middle of computation.
     An error occurred while calling o466.getResult.
    : org.apache.spark.SparkException: Exception thrown in awaitResult: 
     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:107)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:103)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 
    43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188- 
    62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at 
    
    
    
   java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
    at 
    org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
    at 
    org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
    at org.apache.spark.scheduler.DAGScheduler
    at org.apache.spark.scheduler.DAGScheduler
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2476)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2459)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2571)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3761)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3765)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3731)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3825)
    at org.apache.spark.sql.execution.SQLExecution$:130)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3823)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3731)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3730)
    at org.apache.spark.security.SocketAuthServer$
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
    at org.apache.spark.security.SocketAuthServer$
    at org.apache.spark.security.SocketAuthServer$.
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:117)
    at org.apache.spark.security.SocketAuthServer$$a
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:70)
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

warnings.warn(msg)

Below is the how the data frame/code looks like- (Dataset is formed by different datasets and joining) enter image description here

CodePudding user response:

In the traceback it says:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 
    43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188- 
    62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space

Note: java.lang.OutOfMemoryError: Java heap space

You ran out of memory. That's causing the issue.

This is expected, since it uses the single machine to gather all the data, as noted in the documentation:

Note that converting pandas-on-Spark DataFrame to pandas requires to collect all the data into the client machine; therefore, if possible, it is recommended to use pandas API on Spark or PySpark APIs instead.

To learn more about pandas-on-Spark DataFrame, pandas DataFrame and conversion between pyspark Dataframe please see: From/to pandas and PySpark DataFrames.

  • Related