I'm trying to run a streaming application that count tweets for specific users. The producer code:
# -*- coding: utf-8 -*-
import tweepy
import json
import base64
from kafka import KafkaProducer
import kafka.errors
# Twitter API credentials
CONSUMER_KEY = "***"
CONSUMER_SECRET = "***"
ACCESS_TOKEN = "***"
ACCESS_TOKEN_SECRET = "***"
# Kafka topic name
TOPIC_NAME = "tweet-kafka"
# Kafka server
KAFKA_HOST = "localhost"
KAFKA_PORT = "9092"
#a list of ids, the actual ids have been hidden in this question
ids = ["11111", "222222"]
auth= tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)
class KafkaCommunicator:
def __init__(self, producer, topic):
self.producer = producer
self.topic = topic
def send(self, message):
self.producer.send(self.topic, message.encode("utf-8"))
def close(self):
self.producer.close()
class MyStreamListener(tweepy.StreamListener):
"""Listener to tweet stream from twitter."""
def __init__(self,communicator,api=None):
super(MyStreamListener,self).__init__()
self.communicator = communicator
self.num_tweets=0
def on_data(self, raw_data):
data = json.loads(raw_data)
#print(data)
if "user" in data:
user_id = data["user"]["id_str"]
if user_id in ids:
print("Time: " data["created_at"] "; id: " user_id "; screen_name: " data["user"]["screen_name"] )
# put message into Kafka
self.communicator.send(data["user"]["screen_name"])
return True
def on_error(self, status):
print(status)
return True
def create_communicator():
"""Create Kafka producer."""
producer = KafkaProducer(bootstrap_servers=KAFKA_HOST ":" KAFKA_PORT)
return KafkaCommunicator(producer, TOPIC_NAME)
def create_stream(communicator):
"""Set stream for twitter api with custom listener."""
listener = MyStreamListener(communicator=communicator)
stream =tweepy.Stream(auth,listener)
return stream
def run_processing(stream):
# Start filtering messages
stream.filter(follow=ids)
def main():
communicator = None
tweet_stream = None
try:
communicator = create_communicator()
tweet_stream = create_stream(communicator)
run_processing(tweet_stream)
except KeyboardInterrupt:
pass
except kafka.errors.NoBrokersAvailable:
print("Kafka broker not found.")
finally:
if communicator:
communicator.close()
if tweet_stream:
tweet_stream.disconnect()
if __name__ == "__main__":
main()
The streaming app code:
# -*- coding: utf-8 -*-
import sys
import os
spark_path = "D:/spark/spark-2.4.7-bin-hadoop2.7" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
sys.path.insert(0, spark_path "/bin")
sys.path.insert(0, spark_path "/python/pyspark/")
sys.path.insert(0, spark_path "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path "/python/lib/py4j-0.10.7-src.zip")
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYTHONHASHSEED'] = "0"
os.environ['SPARK_YARN_USER_ENV'] = PYTHONHASHSEED = "0"
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
SPARK_APP_NAME = "SparkStreamingKafkaTwitter"
SPARK_CHECKPOINT_TMP_DIR = "D:/tmp"
SPARK_BATCH_INTERVAL = 10
SPARK_LOG_LEVEL = "OFF"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" #Default Zookeeper Consumer Address
KAFKA_TOPIC = "tweet-kafka"
import json
def create_streaming_context():
"""Create Spark streaming context."""
conf = SparkConf().set("spark.executor.memory", "2g")\
.set("spark.driver.memory", "2g")\
.set("spark.driver.bindAddress", "0.0.0.0")
# Create Spark Context
sc = SparkContext(master = "local[2]", appName=SPARK_APP_NAME, conf = conf)
# Set log level
sc.setLogLevel(SPARK_LOG_LEVEL)
# Create Streaming Context
ssc = StreamingContext(sc, SPARK_BATCH_INTERVAL)
# Sets the context to periodically checkpoint the DStream operations for master
# fault-tolerance. The graph will be checkpointed every batch interval.
# It is used to update results of stateful transformations as well
ssc.checkpoint(SPARK_CHECKPOINT_TMP_DIR)
return ssc
def create_stream(ssc):
"""
Create subscriber (consumer) to the Kafka topic (works on RDD that is mini-batch).
"""
return (
KafkaUtils.createDirectStream(
ssc, topics=[KAFKA_TOPIC],
kafkaParams={"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})
.map(lambda x:x[1])
)
def main():
# Init Spark streaming context
ssc = create_streaming_context()
# Get tweets stream
kafka_stream = create_stream(ssc)
# using reduce, count the number of user's tweets for x minute every 30 seconds
# descending sort the result
# Print result
# for 1 minute
tweets_for_1_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x y, lambda x,y: x - y, windowDuration=60, slideDuration=30)
sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
sorted_tweets_for_1_min.pprint()
# for 10 minute
tweets_for_10_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x y, lambda x,y: x - y, windowDuration=600, slideDuration=30)
sorted_tweets_for_10_min = tweets_for_10_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: [1], ascending=False))
sorted_tweets_for_10_min.pprint()
# Start Spark Streaming
ssc.start()
# Waiting for termination
ssc.awaitTermination()
if __name__ == "__main__":
main()
I have installed the following:
- jdk1.8.0_311 and jre1.8.0_311
- python 2.7
- hadoop-2.7.1 which works properly
- spark-2.4.7-bin-hadoop2.7
- kafka_2.13-3.0.0 I have set the environment variables properly But I'm getting the following Exception in runtime after executing the submit command:
spark-submit --master local[2] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 d:\task1\tweet_kafka_streaming_app.py
The exception occurred while processing the stream:
-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------
-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------
Traceback (most recent call last):
File "d:/task1/tweet_kafka_streaming_app.py", line 95, in <module>
main()
File "d:/task1/tweet_kafka_streaming_app.py", line 91, in main
ssc.awaitTermination()
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\context.py", line 192, in awaitTermination
varName = k[len("spark.executorEnv."):]
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o32.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\util.py", line 68, in call
r = self.func(t, *rdds)
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\dstream.py", line 297, in <lambda>
func = lambda t, rdd: oldfunc(rdd)
File "d:/task1/tweet_kafka_streaming_app.py", line 79, in <lambda>
sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 699, in sortBy
return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 667, in sortByKey
rddSize = self.count()
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Exception in thread Thread-4 (most likely raised during interpreter shutdown):
C:\WINDOWS\system32>
CodePudding user response:
I have solved this thanks to the hint given by @OneCricketeer. I upgraded python to 3.8 but faced another errors. Downgrading to python 3.7, that support Spark 2.4.8 or Spark 2.4.7 with Hadoop 2.7, and my world is shining again.