I am fetching tweets from the twitter API and then forwarding them through a tcp connection into a socket where spark is reading data from. This is my code
For reference line
will look something like this
{
data : {
text: "some tweet",
id: some number
}
matching_rules: [{tag: "some string", id: same number}, {tag:...}]
}
def ingest_into_spark(tcp_conn, stream):
for line in stream.iter_lines():
if not (line is None):
try :
# print(line)
tweet = json.loads(line)['matching_rules'][0]['tag']
# tweet = json.loads(line)['data']['text']
print(tweet, type(tweet), len(tweet))
tcp_conn.sendall(tweet.encode('utf-8'))
except Exception as e:
print("Exception in ingesting data: ", e)
the spark side code:
print(f"Connecting to {SPARK_IP}:{SPARK_PORT}...")
input_stream = streaming_context.socketTextStream(SPARK_IP, int(SPARK_PORT))
print(f"Connected to {SPARK_IP}:{SPARK_PORT}")
tags = input_stream.flatMap(lambda tags: tags.strip().split())
mapped_hashtags = tags.map(lambda hashtag: (hashtag, 1))
counts=mapped_hashtags.reduceByKey(lambda a, b: a b)
counts.pprint()
spark is not reading the data sent over the stream no matter what I do. But when I replace the line tweet = json.loads(line)['matching_rules'][0]['tag']
with the line tweet = json.loads(line)['data']['text']
it suddenly works as expected. I have tried printing the content of tweet and its type in both lines and its string in both. Only difference is the first one has the actual tweets while second only has 1 word.
I have tried with many different types of inputs and hard-coding the input as well. But I cannot imagine why reading a different field of a json make my code to stop working.
Replacing either the client or the server with netcat shows that the data is being sent over the socket as expected in both cases
If there are no solutions to this I would be open to knowing about alternate ways of ingesting data into spark as well which could be used in this scenario
CodePudding user response:
As described in the documentation, records (lines) in text streams are delimited by new lines (\n
). Unlike print()
, sendall()
is a byte-oriented function and it does not automatically add a new line. No matter how many tags you send with it, Spark will just keep on reading everything as a single record, waiting for the delimiter to appear. When you send the tweet text instead, it works because some tweets do contain line breaks.
Try the following and see if it makes it work:
tcp_conn.sendall((tweet '\n').encode('utf-8'))