I am attempting to stream tweets in a PostgreSQL database with PostGIS extension using a Python code written in Jupyter Notebook with no success. I have used a lot of tutorials as reference and in the first attempts the code seemed to work and there were no mistakes. I even had printed the message that I was connected to Twitter API. However, no tweets were uploaded into the PostgreSQL database. I thought the problem could be the filters (since maybe I was using filters which had no tweets for the moment), but after some runs removing filters or using other filters I found that wasn't the problem. I think the connection to PostgreSQL isn't the problem neither since I tried printing the tweets directly into Jupyter Notebook and there was no mistake and no error.
After doing some changes basing myself into guides and checking the format of PostgreSQL tables, I see the code connects to the Twitter API but I get this message all the time: 'str' object is not callable
The PostgreSQL table is created using the following code, with the goal that the coordinates of the tweets are stored with point geometry:
CREATE TABLE tweets (tweet_id VARCHAR PRIMARY KEY, user_id VARCHAR, username TEXT, tweet TEXT, hashtags TEXT, lang TEXT, created_at TIMESTAMP, coordinates GEOMETRY);
The used Python code is the next one:
#!/usr/bin/env python
# coding: utf-8
#Import libraries
import tweepy
import pandas as pd
import json
import psycopg2
import time
from html.parser import HTMLParser
#Insert Twitter keys
ckey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
csecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
atoken = "xxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
asecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#Authorize the Twitter API
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
#Call the Twitter API
api = tweepy.API(auth)
#Define Listener block
class MyStreamListener(tweepy.StreamListener):
def __init__(self, time_limit=300):
self.start_time = time.time()
self.limit = time_limit
super(MyStreamListener, self).__init__()
def on_connect(self):
print("Connected to Twitter API.")
def on_status(self, status):
print(status.text)
def on_data(self, raw_data):
try:
datos = json.loads(raw_data)
#Filter only tweets with coordinates
if datos["coordinates"] != None:
#Obtain all the variables to store in each column
tweet_id = datos['id_str']
user_id = datos['user']['id']
user_name = datos['user']['name']
tweet = datos['text']
hashtags = datos["entities"]["hashtags"]
lang = datos['user']['lang']
created_at = datos['created_at']
coordinates = datos["coordinates"]["coordinates"]
# Connect to database
dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates)
if (time.time() - self.start_time) > self.limit:
print(time.time(), self.start_time, self.limit)
return False
except Exception as e:
print(e)
def on_error(self, status_code):
if status_code == 420:
# Returning False in on_data disconnects the stream
return False
def dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates):
#Connect to Twitter database created in PostgreSQL
conn = psycopg2.connect(host="localhost",database="datos_twitter",port=5433,user="xxxxxxx",password="xxxxxxx")
#Create a cursor to perform database operations
cur = conn.cursor()
#With the cursor, insert tweets into a PostgreSQL table
command = "INSERT INTO tweets (tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
cur.execute(command(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates))
#Commit changes
conn.commit()
#Close cursor and the connection
cur.close()
conn.close()
#Streaming of tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
#Filtering of tweets by spatial box and keywords
myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Madrid', 'madrid'])
CodePudding user response:
in on_data
the argument is a raw data (string json that received from twitter api endpoint)
def on_data(self, raw_data):
"""This is called when raw data is received from the stream.
This method handles sending the data to other methods based on the
message type.
Parameters
----------
raw_data : JSON
The raw data from the stream
"""
use the on_status
like this, in this function the argumant is Status
object and you can access to its fields, like status.text
def on_status(self, status):
"""This is called when a status is received.
Parameters
----------
status : Status
The Status received
"""
these function names maybe change in different version of tweepy
, read this article to install specified version of python package
CodePudding user response:
I have edited the code, fixing the error pointed in the comment. I have def on_status(self, status)
based on an example which worked for me before (and that I had mistakenly replaced by myself by data in that case).
The connection is created and I receive the message for that, but then 10 seconds later I get the following error TypeError: 'str' object is not callable
The complete error trace is:
TypeError Traceback (most recent call last)
<ipython-input-14-249caad94bfb> in <module>
4 tweet_mode="extended")
5 #Filtering by spatial box and keywords
----> 6 myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Covid','covid-19'])
~\Anaconda3\lib\site-packages\tweepy\streaming.py in filter(self, follow, track, is_async, locations, stall_warnings, languages, encoding, filter_level)
472 self.body['filter_level'] = filter_level.encode(encoding)
473 self.session.params = {'delimited': 'length'}
--> 474 self._start(is_async)
475
476 def sitestream(self, follow, stall_warnings=False,
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _start(self, is_async)
387 self._thread.start()
388 else:
--> 389 self._run()
390
391 def on_closed(self, resp):
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
318 # call a handler first so that the exception can be logged.
319 self.listener.on_exception(exc_info[1])
--> 320 six.reraise(*exc_info)
321
322 def _data(self, data):
~\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
287 self.snooze_time = self.snooze_time_step
288 self.listener.on_connect()
--> 289 self._read_loop(resp)
290 except (Timeout, ssl.SSLError) as exc:
291 # This is still necessary, as a SSLError can actually be
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _read_loop(self, resp)
349 next_status_obj = buf.read_len(length)
350 if self.running and next_status_obj:
--> 351 self._data(next_status_obj)
352
353 # # Note: keep-alive newlines might be inserted before each length value.
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _data(self, data)
321
322 def _data(self, data):
--> 323 if self.listener.on_data(data) is False:
324 self.running = False
325
~\Anaconda3\lib\site-packages\tweepy\streaming.py in on_data(self, raw_data)
52 if 'in_reply_to_status_id' in data:
53 status = Status.parse(self.api, data)
---> 54 if self.on_status(status) is False:
55 return False
56 elif 'delete' in data:
<ipython-input-12-3460245af936> in on_status(self, status)
34 if not hasattr(status, "retweeted_status") and coordinates!= None:
35 # Connect to database
---> 36 dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
37
38 if (time.time() - self.start_time) > self.limit:
<ipython-input-13-d7acfb1cce67> in dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
6
7 command = "INSERT INTO tweets (tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
----> 8 cur.execute(command(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates))
9
10 conn.commit()
TypeError: 'str' object is not callable
I am not sure, but it seems the error comes now from the line in which I insert the tweets into PostgreSQL.
I have now edited the code adding the function def on_data(self, raw_data)
and then put def on_status(self, status)
like it has been mentioned in the comments. I continue getting the error TypeError: 'str' object is not callable