Home > Mobile >  Spark twitter streaming application Error on windows 10 when submitting python file to local server
Spark twitter streaming application Error on windows 10 when submitting python file to local server

Time:12-09

I'm trying to run a streaming application that count tweets for specific users. The producer code:

# -*- coding: utf-8 -*-
import tweepy
import json
import base64
from kafka import KafkaProducer
import kafka.errors

# Twitter API credentials
CONSUMER_KEY   = "***"
CONSUMER_SECRET   = "***"
ACCESS_TOKEN   = "***"
ACCESS_TOKEN_SECRET   = "***"

# Kafka topic name
TOPIC_NAME = "tweet-kafka"

# Kafka server
KAFKA_HOST = "localhost"
KAFKA_PORT = "9092"

#a list of ids, the actual ids have been hidden in this question
ids = ["11111", "222222"]

auth= tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)

class KafkaCommunicator:
    def __init__(self, producer, topic):
        self.producer = producer
        self.topic = topic

    def send(self, message):
        self.producer.send(self.topic, message.encode("utf-8"))

    def close(self):
        self.producer.close()
        
class MyStreamListener(tweepy.StreamListener):
    """Listener to tweet stream from twitter."""
    def __init__(self,communicator,api=None):
        super(MyStreamListener,self).__init__()
        self.communicator = communicator
        self.num_tweets=0

    def on_data(self, raw_data):
        data = json.loads(raw_data)
        #print(data)
        if "user" in data:
            user_id = data["user"]["id_str"]
            if user_id in ids:
                print("Time: "   data["created_at"]   "; id: "   user_id   "; screen_name: "   data["user"]["screen_name"] )
                # put message into Kafka
                self.communicator.send(data["user"]["screen_name"])
        return True
        
    def on_error(self, status):
        print(status)
        return True

def create_communicator():
    """Create Kafka producer."""
    producer = KafkaProducer(bootstrap_servers=KAFKA_HOST   ":"   KAFKA_PORT)
    return KafkaCommunicator(producer, TOPIC_NAME)


def create_stream(communicator):
    """Set stream for twitter api with custom listener."""
    listener = MyStreamListener(communicator=communicator)
    stream =tweepy.Stream(auth,listener)
    return stream

def run_processing(stream):
    # Start filtering messages
    stream.filter(follow=ids)

def main():
    communicator = None
    tweet_stream = None
    try:
        communicator = create_communicator()
        tweet_stream = create_stream(communicator)
        run_processing(tweet_stream)
    except KeyboardInterrupt:
        pass
    except kafka.errors.NoBrokersAvailable:
        print("Kafka broker not found.")
    finally:
        if communicator:
            communicator.close()
        if tweet_stream:
            tweet_stream.disconnect()


if __name__ == "__main__":
    main()

The streaming app code:

# -*- coding: utf-8 -*-
import sys
import os

spark_path = "D:/spark/spark-2.4.7-bin-hadoop2.7" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
sys.path.insert(0, spark_path   "/bin")
sys.path.insert(0, spark_path   "/python/pyspark/")
sys.path.insert(0, spark_path   "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path   "/python/lib/py4j-0.10.7-src.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYTHONHASHSEED'] = "0"
os.environ['SPARK_YARN_USER_ENV'] = PYTHONHASHSEED = "0"

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

SPARK_APP_NAME = "SparkStreamingKafkaTwitter"
SPARK_CHECKPOINT_TMP_DIR = "D:/tmp"
SPARK_BATCH_INTERVAL = 10
SPARK_LOG_LEVEL = "OFF"

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" #Default Zookeeper Consumer Address
KAFKA_TOPIC = "tweet-kafka"

import json

def create_streaming_context():
    """Create Spark streaming context."""
    conf = SparkConf().set("spark.executor.memory", "2g")\
                .set("spark.driver.memory", "2g")\
                .set("spark.driver.bindAddress", "0.0.0.0")
    # Create Spark Context
    sc = SparkContext(master = "local[2]", appName=SPARK_APP_NAME, conf = conf)
    # Set log level
    sc.setLogLevel(SPARK_LOG_LEVEL)
    # Create Streaming Context
    ssc = StreamingContext(sc, SPARK_BATCH_INTERVAL)
    # Sets the context to periodically checkpoint the DStream operations for master
    # fault-tolerance. The graph will be checkpointed every batch interval.
    # It is used to update results of stateful transformations as well
    ssc.checkpoint(SPARK_CHECKPOINT_TMP_DIR)
    return ssc

def create_stream(ssc):
    """
    Create subscriber (consumer) to the Kafka topic (works on RDD that is mini-batch).
    """
    return (
        KafkaUtils.createDirectStream(
            ssc, topics=[KAFKA_TOPIC],
            kafkaParams={"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})
            .map(lambda x:x[1])
    )

def main():

    # Init Spark streaming context
    ssc = create_streaming_context()

    # Get tweets stream
    kafka_stream = create_stream(ssc)

    # using reduce, count the number of user's tweets for x minute every 30 seconds
    # descending sort the result
    # Print result
    
    # for 1 minute
    tweets_for_1_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x   y, lambda x,y: x - y, windowDuration=60, slideDuration=30)
    sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
    sorted_tweets_for_1_min.pprint()
    
    # for 10 minute
    tweets_for_10_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x   y, lambda x,y: x - y, windowDuration=600, slideDuration=30)
    sorted_tweets_for_10_min = tweets_for_10_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: [1], ascending=False))
    sorted_tweets_for_10_min.pprint()

    # Start Spark Streaming
    ssc.start()

    # Waiting for termination
    ssc.awaitTermination()


if __name__ == "__main__":
    main()

I have installed the following:

  1. jdk1.8.0_311 and jre1.8.0_311
  2. python 2.7
  3. hadoop-2.7.1 which works properly
  4. spark-2.4.7-bin-hadoop2.7
  5. kafka_2.13-3.0.0 I have set the environment variables properly But I'm getting the following Exception in runtime after executing the submit command:
spark-submit --master local[2] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 d:\task1\tweet_kafka_streaming_app.py

The exception occurred while processing the stream:

-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------

-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------

Traceback (most recent call last):
  File "d:/task1/tweet_kafka_streaming_app.py", line 95, in <module>
    main()
  File "d:/task1/tweet_kafka_streaming_app.py", line 91, in main
    ssc.awaitTermination()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\context.py", line 192, in awaitTermination
    varName = k[len("spark.executorEnv."):]
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__

  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o32.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\util.py", line 68, in call
    r = self.func(t, *rdds)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\dstream.py", line 297, in <lambda>
    func = lambda t, rdd: oldfunc(rdd)
  File "d:/task1/tweet_kafka_streaming_app.py", line 79, in <lambda>
    sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 699, in sortBy
    return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 667, in sortByKey
    rddSize = self.count()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more


        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

Exception in thread Thread-4 (most likely raised during interpreter shutdown):

C:\WINDOWS\system32>

CodePudding user response:

I have solved this thanks to the hint given by @OneCricketeer. I upgraded python to 3.8 but faced another errors. Downgrading to python 3.7, that support Spark 2.4.8 or Spark 2.4.7 with Hadoop 2.7, and my world is shining again.

  • Related