Home > Software engineering >  Changing Values of NumPy Array inside of an RDD with map(lambda) returning error when trying to view
Changing Values of NumPy Array inside of an RDD with map(lambda) returning error when trying to view

Time:04-02

I'm new to PySpark. The RDD I'm working with has NumPy arrays, each with its own associated key. Here is an example of data from my RDD, pulled by using rdd.take(1):

('418292', array([0.07541697, 0.03698332, 0.01885424, ..., 0. , 0. , 0. ]))

I am trying to change the values in the NumPy array so that any value greater than 0 is set to 1, otherwise the values remain 0.

I have written the following code to try to make the change:

binary = rdd.map(lambda x: 1 if x[1] > 0 else 0)
binary.take(3)

Creating "binary" does not generate any errors when I run my code. However, whenever I try to .collect(), .take(), or otherwise use binary in any other code, such as the one below:

df = binary.reduce(lambda x1, x2: ("", np.add(x1[1], x2[1])))[1]

I get thrown this error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 810.0 failed 1 times, most recent failure: Lost task 0.0 in stage 810.0 (TID 1529) (DESKTOP-RU0T17C.mshome.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\Owner\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\Owner\AppData\Local\Temp\ipykernel_8936\3531280699.py", line 6, in <lambda>
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\Owner\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "C:\ProgramData\spark\spark-3.2.1-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\Owner\AppData\Local\Temp\ipykernel_8936\3531280699.py", line 6, in <lambda>
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

This is the only instance in my code so far where .collect() or .take() have thrown this error. I found this StackOverflow thread that had a seemingly similar problem, but the solution returned the same error above.

I have seen other Stack Overflow threads where this error appeared due to a faulty installation of Java, PySpark, or both. I have tried uninstalling and reinstalling both, and nothing changed, so I am confident it is not a package installation issue. Any help is greatly appreciated!

CodePudding user response:

Kind of hack-y, but this solved it:

binary = rdd.map(lambda x: (x[0], np.ceil(x[1])))

x[0] keeps the key in the RDD. Meanwhile, every value in every NumPy array was between 0-1, no negatives. Therefore, np.ceil(x[1]) rounds any value greater than 0 to 1, while any value that is 0 remains 0.

CodePudding user response:

Your input is something like this which I have copied into RDD:

     x = sc.parallelize([('418292', np.array([0.07541697, 0.03698332, 0.01885424, 0.0000131414, 0.5724728, 0.222747 ,0.7181919])),('418292', np.array([0.07541697, 0.03698332, 0.01885424, 0.0000131414, 0.5724728, 0.222747 ,0.7181919])),('418292', np.array([0.07541697, 0.03698332, 0.01885424, 0.0000131414, 0.5724728, 0.222747 ,0.7181919])),('418292',np.array([0.0, 0.0, 0.01885424, 0.0000, 0.0, 0.0 ,0.0]))]

I think you forgot that the x[1] is an array and you can't compare that with 1 as you have done in this line:

    binary = rdd.map(lambda x: 1 if x[1] > 0 else 0)

Use mapValues to map the value of the key-value pair. Here, I am iterating over each element of the array:

    ans = x.mapValues(lambda a: [1 if i>0 else 0 for i in a])
    ans.collect()

    [('418292', [1, 1, 1, 1, 1, 1, 1]), ('418292', [1, 1, 1, 1, 1, 1, 1]), ('418292', [1, 1, 1, 1, 1, 1, 1]), ('418292', [0, 0, 1, 0, 0, 0, 0])]

Hope that my answer helps you!!

  • Related