Home > OS >  Asyncronous Threading Python OCPP
Asyncronous Threading Python OCPP

Time:12-17

I'm trying to implement the ocpp libary in python. There are two functions, which run in a while loop continously, cp.start() for logging and cp.heartbeat as a protocol intern heartbeat. When I want to implement them normally into my routine, the while loops will block the event loop, so I want them as threads. But the libary seems to have a problem with it.

async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_3',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_3', ws)

        def start_logging(loop):
            asyncio.set_event_loop(loop)
            loop.create_task(cp.start())
            loop.run_forever()

        loop = asyncio.get_event_loop()
        t = threading.Thread(target=start_logging, args=(loop,))
        t.start()
   
        await asyncio.gather(cp.send_heartbeat())


if __name__ == '__main__':
    asyncio.run(main())

The errors:

ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen

AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>

AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>

Even when I set the thread as a daemon, the heartbeat will work, but I can't close the program anymore. The final goal is to let cp.start() and the heartbeat run in a thread, so that I can control the charging proces of an electric vehicle in another logic.

CodePudding user response:

Looking at the code base on github, all functions that you want to call are coroutines. They might contain infinite loops, but they have await statements in them, which makes them yield control back to the event loop. So there is no need to use threads for anything, as far as I can tell. Taken from the examples for version 1.6:

async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_1',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_1', ws)

        await asyncio.gather(cp.start(), cp.send_boot_notification())

I guess this should get you started.

EDIT:

Ok, the above still stands. I answered what you were asking but what you really need is to understand how this API is supposed to work. I give you that their example is somewhat confusing and I think you will not get around reading their documentation. But the gist of what I understood from the code is that you need to subclass the central class ChargePoint, which kind of wasn't clear from the example because they named their subclass the same as their baseclass. I'll try to make their example clearer. I hope I understood correctly...:

# simplified and commented version of the v1.6 example
import asyncio
import logging
import websockets

from ocpp.routing import on
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
from ocpp.v16.enums import Action, RegistrationStatus

logging.basicConfig(level=logging.INFO)


class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)

    @on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
    def on_some_message(*args, **kwargs):
        pass # do something which probably got something to do with charging something
        asyncio.create_task(self.some_coro()) # create async task from sync code

    # add more decorated functions to implement your logic

    async def some_coro(self):
        pass # do something with I/O

    async def send_boot_notification(self):
        request = call.BootNotificationPayload(
            charge_point_model="Optimus",
            charge_point_vendor="The Mobility House"
        )

        response = await self.call(request)

        if response.status == RegistrationStatus.accepted:
            print("Connected to central system.")


async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_1',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/

        # this seems initializing, maybe not do it concurrently
        await cp.send_boot_notification()

        # this starts the infinite loop which receives and relays
        # messages to their respective hooks
        # (you get the concurrency you wanted out of threads by registering
        # your own hooks (pieces of code)
        await cp.start() # main() stays here until you somehow shut it down

if __name__ == '__main__':
    asyncio.run(main())

So obviously I couldn't test this and can't promise you this is how they intended, but I hope it helps.

  • Related