Home > other >  Flask Socket-IO Server Client Not Communicating (with tweepy and twilio)
Flask Socket-IO Server Client Not Communicating (with tweepy and twilio)

Time:10-29

I am trying to have a Flask Server that allows me to launch a tweepy stream and on every message received in the stream listener, it sends that message to a socketio client. The Flask server is at the same time supposed to allow Twilio to post to it, and route that message to the client—so that the client is receiving messages from both Twilio and twitter.

I have been trying to get the server to send messages over to the client for the data incoming from twitter, the code for Twilio works just fine. It sends data over to the client on message receipt. The main loop in tweepy is also not locking up the program—I can test print statements and see tweets and the incoming sms's being printed in the handle_message(msg) function asynchronously. I feel like there must be something really simple that I am missing here since the SMS's are emitted to the client, but the incoming tweets are not, even though they are propagating through to the handle_message(msg) function. What gives?

server.py

from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json

PATH = '/path/to/credentials/'
with open(PATH, "r") as file:
    credentials = json.load(file)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, cors_allowed_origins="*")

auth = tweepy.OAuthHandler(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'])
auth.set_access_token(credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
api = tweepy.API(auth)

class MyListener(tweepy.StreamListener):
    def on_status(self, status):
        print('status')

    def on_data(self, data):
        handle_message(data)

    def on_error(self, status):
        print('error')
        print(status)

stream_listener = MyListener()

# twilio sms route
@app.route('/sms', methods=['POST'])
def sms():
    number = request.form['From']
    message_body = request.form['Body']
    message_data = {"number": number, "msg": message_body}
    resp = MessagingResponse()
    resp.message('Hello {}, you said: {}'.format(number, message_body))
    handle_message(message_data)
    return str(resp)

# flask-socketio stuff
@sio.on('connect')
def connect():
    print('connected')
    sio.emit('client_connected', "you connected")
    search_term = "#mysearchterm"
    stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
    stream.filter(track=[search_term], is_async=True)
    sio.emit('client_connected', "the search term is {}".format(search_term))

@sio.on('disconnect')
def disconnect():
    print('Client Diconnected')

@sio.event
def handle_message(message):
    print("This is the message received: ", message)
    sio.emit('handle_message', message)

if __name__ == '__main__':
    sio.run(app)

client.py

import socketio

client = socketio.Client()

@client.on('client_connected')
def on_connect(message):
    print(message)

@client.on('handle_message')
def message(data):
    print(data)

client.connect('http://localhost:5000/')

CodePudding user response:

Twilio developer evangelist here.

You've decorated the handle_message function as @sio.event but as far as I can see in the docs, you should only do that to have the handle_message method respond to events on the socket called "handle_message".

I'd start be removing the @sio.event decorator.

I'm not a Python expert, but I also wonder whether there is a scope issue here. You define your MyListener class and create an instance of it before you define the handle_message method. Just to test, can you try emitting to the socket directly within the on_data method:

    def on_data(self, data):
        sio.emit('handle_message', data)

If that works, consider moving the definition of handle_message above the definition of MyListener.

CodePudding user response:

I solved my problem! As I noted in this comment, the issue was with multithreading and passing information between the threads. With tweepy, the parameter is_async=True, which in 4.1.0 is threading=True, opens up a new thread once the stream is run.

Instead of trying to deal with passing information around, I exploited the extant flask-socketio functionality by using a local redis server as a message queue (start from the section "Using Multiple Workers" if you are setting this up for the first time, also be sure to install redis).

Here is the updated server.py code. The client.py code remained essentially unchanged:

import eventlet
eventlet.monkey_patch()
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json

PATH = '/PATH/TO/CREDENTIALS'
with open(PATH, "r") as file:
    credentials = json.load(file)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, message_queue='redis://', cors_allowed_origins="*")

class MyStream(tweepy.Stream):
    def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
        super(MyStream, self).__init__(consumer_key, consumer_secret, access_token, access_secret)
        self.stream_sio = SocketIO(message_queue='redis://')

    def on_status(self, status):
        print('status')

    def on_data(self, data):
        json_data = json.loads(data)
        self.stream_sio.emit('handle_message', json_data['text'])
        # TODO: Send along all necessary information

@app.route('/sms', methods=['POST'])
def sms():
    number = request.form['From']
    message_body = request.form['Body']
    message_data = {"number": number, "msg": message_body}
    resp = MessagingResponse()
    resp.message('Hello {}, you said: {}'.format(number, message_body))
    handle_message(message_data)
    return str(resp)
 
@sio.on('connect')
def connect():
    print('connected')
    sio.emit('client_connected', "you connected")
    search_term = "#testingtesting123"
    stream = MyStream(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'],
                           credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
    stream.filter(track=[search_term], threaded=True)
    sio.emit('client_connected', "the search term is {}".format(search_term))

@sio.on('disconnect')
def disconnect():
    print('Client disconnected')

def handle_message(message):
    sio.emit('handle_message', message)

if __name__ == '__main__':
    sio.run(app)
  • Related