I want to get every 10 Minutes the last 1000 Tweets with the Hashtags "Bitcoin, BTC, Cardano and ADA". For this I would use the Twitter API and the Package "Tweepy" in an AWS Lambda Function.
Here you can see my code:
import pandas as pd
import sqlalchemy
import tweepy
import psycopg2
import os
# Credentials for Database connection
ENDPOINT = os.environ['ENDPOINT']
DB_NAME = os.environ['DBNAME']
USERNAME = os.environ['USERNAME']
PASSWORD = os.environ['PASSWORD']
# Credentials for Twitter-API
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCESS_TOKEN_SECRET = os.environ['ACCESS_TOKEN_SECRET']
CONSUMER_KEY = os.environ['CONSUMER_KEY']
CONSUMER_SECRET = os.environ['CONSUMER_KEY_SECRET']
BEARER_TOKEN = os.environ['BEARER_TOKEN']
def lambda_handler(event, context):
# Build Twitter API
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True)
# Get Twitter Data and prepare for Upload
lst_time = []
lst_text = []
lst_like = []
lst_retweet = []
for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(10):
lst_text.append(i.text)
lst_like.append(i.favorite_count)
lst_retweet.append(i.retweet_count)
lst_time.append(i.created_at)
# Prepare DataFrame for Upload to DataBase
df = pd.DataFrame(list(zip(lst_time, lst_text, lst_like,lst_retweet)), columns = ['time', 'text', 'like', 'retweet'])
df['date_new'] = pd.to_datetime(df['time']).dt.date
df['time_new'] = pd.to_datetime(df['time']).dt.time
df = df.drop(['time'], axis=1)
# Build DB connection
try:
conn = psycopg2.connect("host={} dbname={} user={} password={}".format(ENDPOINT,DBNAME,USERNAME,PASSWORD))
except psycopg2.Error as e:
print("Error: Could not get Connection to DB")
print(e)
# Create DB Cursor
try:
cur = conn.cursor()
except psycopg2.Error as e:
print("Error: Could not get a cursor")
print(e)
# Set Autocommit
conn.set_session(autocommit=True)
# Create sqlalchemy engine
engine = sqlalchemy.create_engine("postgresql://{}:{}@{}:5432/{}".format(USERNAME,PASSWORD,ENDPOINT,DBNAME))
# Create Table in DB
df.to_sql('Twitter_Keyword_search', engine, if_exists='append', index = False)
# Close DB Connection
cur.close()
conn.close()
print("success")
But everytime I want to test, the execution result is
Test Event Name
test
Response
{
"errorMessage": "Failed to send request: HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/search/tweets.json?q=Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f0e0fa0da90>, 'Connection to api.twitter.com timed out. (connect timeout=60)'))",
"errorType": "TweepyException",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 34, in lambda_handler\n for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):\n",
" File \"/opt/python/tweepy/cursor.py\", line 86, in __next__\n return self.next()\n",
" File \"/opt/python/tweepy/cursor.py\", line 286, in next\n self.current_page = next(self.page_iterator)\n",
" File \"/opt/python/tweepy/cursor.py\", line 86, in __next__\n return self.next()\n",
" File \"/opt/python/tweepy/cursor.py\", line 167, in next\n data = self.method(max_id=self.max_id, parser=RawParser(), *self.args, **self.kwargs)\n",
" File \"/opt/python/tweepy/api.py\", line 33, in wrapper\n return method(*args, **kwargs)\n",
" File \"/opt/python/tweepy/api.py\", line 46, in wrapper\n return method(*args, **kwargs)\n",
" File \"/opt/python/tweepy/api.py\", line 1269, in search_tweets\n ), q=q, **kwargs\n",
" File \"/opt/python/tweepy/api.py\", line 222, in request\n raise TweepyException(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])\n",
" File \"/opt/python/tweepy/api.py\", line 219, in request\n timeout=self.timeout, auth=auth, proxies=self.proxy\n",
" File \"/opt/python/requests/sessions.py\", line 542, in request\n resp = self.send(prep, **send_kwargs)\n",
" File \"/opt/python/requests/sessions.py\", line 655, in send\n r = adapter.send(request, **kwargs)\n",
" File \"/opt/python/requests/adapters.py\", line 504, in send\n raise ConnectTimeout(e, request=request)\n"
]
}
Function Logs
START RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0 Version: $LATEST
OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k
/opt/python/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
""")
[ERROR] TweepyException: Failed to send request: HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/search/tweets.json?q=Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f0e0fa0da90>, 'Connection to api.twitter.com timed out. (connect timeout=60)'))
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 34, in lambda_handler
for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):
File "/opt/python/tweepy/cursor.py", line 86, in __next__
return self.next()
File "/opt/python/tweepy/cursor.py", line 286, in next
self.current_page = next(self.page_iterator)
File "/opt/python/tweepy/cursor.py", line 86, in __next__
return self.next()
File "/opt/python/tweepy/cursor.py", line 167, in next
data = self.method(max_id=self.max_id, parser=RawParser(), *self.args, **self.kwargs)
File "/opt/python/tweepy/api.py", line 33, in wrapper
return method(*args, **kwargs)
File "/opt/python/tweepy/api.py", line 46, in wrapper
return method(*args, **kwargs)
File "/opt/python/tweepy/api.py", line 1269, in search_tweets
), q=q, **kwargs
File "/opt/python/tweepy/api.py", line 222, in request
raise TweepyException(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])
File "/opt/python/tweepy/api.py", line 219, in request
timeout=self.timeout, auth=auth, proxies=self.proxy
File "/opt/python/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/opt/python/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/opt/python/requests/adapters.py", line 504, in send
raise ConnectTimeout(e, request=request)
END RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0
REPORT RequestId: e434987f-5204-42bf-8d54-8fd217aff1e0 Duration: 240254.49 ms Billed Duration: 240255 ms Memory Size: 1024 MB Max Memory Used: 160 MB Init Duration: 2818.23 ms
Request ID
e434987f-5204-42bf-8d54-8fd217aff1e0
After searching for hours in the Internet I hope you can help. PS: For those who haven't thought it by reading my code => I'm a bloody beginner
Thanks
CodePudding user response:
File "/var/task/lambda_function.py", line 34, in lambda_handler for i in tweepy.Cursor(api.search_tweets, q='Bitcoin -filter:retweets OR Cardano -filter:retweets OR ADA -filter:retweets OR BTC -filter:retweets' ).items(1000):
.... raise ConnectTimeout(e, request=request)
So, the only thing that matters is why the script cannot connect to the API endpoint. Assuming Twitter is not experiencing a secret outage, the first place I would look at is whether the script can reach anything on the public internet. So, I would start by instead deploying a simple script without all the other details at all which just tries to make a GET
request to example.com
. If that also gets a timeout error (as I suspect it will), that means you have not set up internet connectivity for your lambdas in a way that allow them to reach out.
In that case, this ceases to be a question about Python. AWS documentation might help: "How do I give internet access to a Lambda function that's connected to an Amazon VPC?", but the exact solution will depend on your organization's structure.