Home > Back-end >  Is it possible to integrate GCP pub/sub SteamingPullFutures with discordpy?
Is it possible to integrate GCP pub/sub SteamingPullFutures with discordpy?

Time:11-16

I'd like to use a pub/sub StreamingPullFuture subscription with discordpy to receive instructions for removing users and sending updates to different servers.

Ideally, I would start this function when starting the discordpy server:

@bot.event
async def on_ready():
    print(f'{bot.user} {bot.user.id}')
    await pub_sub_function()

I looked at discord.ext.tasks but I don't think this use case fits since I'd like to handle irregularly spaced events dynamically.

I wrote this pub_sub_function() (based on the pub/sub python client docs) but it doesn't seem to be listening to pub/sub or return anything:

def pub_sub_function():

    subscriber_client = pubsub_v1.SubscriberClient()

    # existing subscription
    subscription = subscriber_client.subscription_path(
        'my-project-id', 'my-subscription')

    def callback(message):
        print(f"pubsub_message: {message}")
        message.ack()
        return message

    future = subscriber_client.subscribe(subscription, callback)
    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()  # Trigger the shutdown.
        future.result()  # Block until the shutdown is complete.

Has anyone done something like this? Is there a standard approach for sending data/messages from external services to a discordpy server and listening asynchronously?

Update: I got rid of pub_sub_function() and changed the code to this:

subscriber_client = pubsub_v1.SubscriberClient()

# existing subscription
subscription = subscriber_client.subscription_path('my-project-id', 'my-subscription')

def callback(message):
    print(f"pubsub_message: {message}")
    message.ack()
    return message

@bot.event
async def on_ready():
    print(f'{bot.user} {bot.user.id}')
    await subscriber_client.subscribe(subscription, callback).result()

This works, sort of, but now the await subscriber_client.subscribe(subscription, callback).result() is blocking the discord bot, and returning this error:

WARNING  discord.gateway Shard ID None heartbeat blocked for more than 10 seconds.
Loop thread traceback (most recent call last):

CodePudding user response:

Ok, so this Github pr was very helpful.

In it, the user says that modifications are needed to make it work with asyncio because of Google's pseudo-future implementation:

Google implemented a custom, psuedo-future
need monkey patch for it to work with asyncio

But basically, to make the pub/sub future act like the concurrent.futures.Future, the discord.py implementation should be something like this:

async def pub_sub_function():
    subscriber_client = pubsub_v1.SubscriberClient()
    # existing subscription
    subscription = subscriber_client.subscription_path('my-project-id', 'my-subscription')

    def callback(message):
        print(f"pubsub_message: {message}")
        message.ack()
        return message

    future = subscriber_client.subscribe(subscription, callback)
    # Fix the google pseduo future to behave like a concurrent Future:
    future._asyncio_future_blocking = True
    future.__class__._asyncio_future_blocking = True
    real_pubsub_future = asyncio.wrap_future(future)
    return real_pubsub_future

and then you need to await the function like this:

@bot.event
async def on_ready():
    print(f'{bot.user} {bot.user.id}')
    await pub_sub_function()
  • Related