Home > Software design >  coding reduceByKey(lambda) in map does'nt work pySpark
coding reduceByKey(lambda) in map does'nt work pySpark

Time:09-17

I can't understand why my code isn't working. The last line is the problem:

import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
conf=SparkConf().setMaster("local").setAppName("mein soft")
sc=SparkContext(conf=conf)
sqlContext=SQLContext(sc)

lines=sc.textFile("File.txt")
#lines.repartition(3)
lines.getNumPartitions()

def lan_map(x):
    if "word1" and "word2" in x:
        return ("Count",(1,1))
    elif "word1" in x:
        return ("Count",("1,0"))
    elif "word2" in x:
        return ("Count",("0,1"))
    else:
        return ("Count",("0,0"))
    
mapfun=lines.map(lan_map)

mapfun.reduceByKey(lambda x, y: (x[0] y[0], x[1] y[1])).collect() 

And the error:

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in 1 #Esto resume lo que se hicimos 3 celdas atrás. ----> 2 mapfun.reduceByKey(lambda x,y: (x[0] y[0], x[1] y[1])).collect() 3 4 #mapfun.reduceByKey(noMeFuncaLambdaAsiQueHagoEsto(mapfun.x,mupfun.y)).collect() 5 #Esto nos devuelve directamente el recuento de cuántas veces aparece "Python" y cuántas aparece "Spark"

C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py in collect(self) 947 """ 948 with SCCallSiteSync(self.context) as css: --> 949 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 950 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 951

C:\spark-3.1.2-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306

C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\sql\utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)

C:\spark-3.1.2-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (LAPTOP-PB7QDPVE executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 594, in process File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2916, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2916, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 418, in func return f(iterator) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2144, in combineLocally merger.mergeValues(iterator) File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\shuffle.py", line 242, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\util.py", line 73, in wrapper return f(*args, **kwargs) File "", line 2, in TypeError: unsupported operand type(s) for : 'int' and 'str'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 594, in process File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2916, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2916, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 418, in func return f(iterator) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py", line 2144, in combineLocally merger.mergeValues(iterator) File "C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\shuffle.py", line 242, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\util.py", line 73, in wrapper return f(*args, **kwargs) File "", line 2, in TypeError: unsupported operand type(s) for : 'int' and 'str'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more

I feel so lost that I even can't return just one possition from my funmap. I mean doesnt this should work:

mapfun[1]

I have tried with a function instead. But I failed worse:

def fun2(x,y):
    x[0] y[0]
    x[1] y[1]
mapfun.reduceByKey(fun2(x,y)).collect()

CodePudding user response:

You are receiving the error

TypeError: unsupported operand type(s) for : 'int' and 'str'

because your tuple values are string i.e. ("1,0") instead of (1,0), python currently will not apply this operator or add the int and str(string) data types.

Moreover, there seems to be a logic error in your comparison in your map function where you have "word1" and "word2" in x as this will only check if "word2" is in x. I would recommend the following rewrite:

def lan_map(x):
    if "word1" in x and "word2" in x:
        return ("Count",(1,1))
    elif "word1" in x:
        return ("Count",(1,0))
    elif "word2" in x:
        return ("Count",(0,1))
    else:
        return ("Count",(0,0))

or possibly shorter

def lan_map(x):
     return ("Count", (
         1 if "word1" in x else 0,
         1 if "word2" in x else 0
     ))

Let me know if this works for you.

  • Related