Home > database >  StreamReader.readline() does not return upon reading a newline
StreamReader.readline() does not return upon reading a newline

Time:01-03

So I have been trying to experiment with Streams in Python and wrote the following code. ServiceSubscription.py

class ServiceSubscription():
    def __init__(self) -> None:
        self.subscriber_connections = []
        self.service_connections = []
        self.server_listener = None
        # Dictionary of service readers where key is the name of the service and the value is the reader for the service
        self.service_readers = {}
    
    """
    Create the listening server on port 7777
    """
    async def initiate_server(self):
        server = await asyncio.start_server(self.handle_incoming, '127.0.0.1', 7777)
        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')
        
        async with server:
            await server.serve_forever()
    
    """
    Handle the incoming connection based on whether the connection is from a service or suscriber
    
    The first message sent should include either 'service:SERVICE_NAME' or 'suscriber: [SERVICE1, SERVICE2, ...]'
    """
    async def handle_incoming(self, reader: StreamReader, writer: StreamWriter):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')

        print(f"Received {message!r} from {addr!r}")
        if ("Service:" in f"{message!r}"):
            message = message[0:7]
            self.service_connections.append(Connections(reader, writer, message))
            service_reader = ServiceReader(reader=reader, writer=writer)
            self.service_readers[message] = (service_reader)
            await service_reader.broadcast()
        
        elif ("Suscriber:" in f"{message!r}"):
            message = message[0:9]
            self.subscriber_connections.append(Connections(reader, writer, message))
            self.service_readers[message].add_suscribers(writer)
        
        else:
            pass

class ServiceReader():
    def __init__(self, reader:  StreamReader, writer: StreamWriter):
        self.reader = reader
        self.writer = writer
        self.suscribers: Writer = []
        self.loop = asyncio.get_event_loop()

    def stop(self):
        self._stop.set()
        
    """
    Add new subscriber's StreamWriter here
    """
    def add_suscribers(self, writer: StreamWriter):
        # Not sure if this will work
        self.suscribers.append(writer)
        
    """
    Read data and broadcast it to subscribed clients
    """
    async def broadcast(self):
        while not self.reader.at_eof():
            data = await self.reader.readline()
            if b'\n' in data:
                print(True)
            data = data.decode()
            print(data)

WriterTest.py

import asyncio
from os import linesep

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 7777)

    print(f'Send: {message!r}\n')
    writer.write(message.encode())
    await writer.drain()
    
    while not writer.is_closing():
        data = input("Type a message\n")
        data = (data   "\n").encode()
        writer.write(data)
        await writer.drain()
        
    writer.close()
    

asyncio.run(tcp_echo_client('Service: TEST'))

I ran both python ServiceSubscription.py and python WriterTest.py at the same time to simulate a client and server.

Upon running ServiceSubscription.py, it will print "Serving on ('127.0.0.1', 7777)". When WriterTest.py is executed, ServiceSubscription.py will print "Received 'Service: TEST' from ('127.0.0.1', 39923)". However, typing anything beyond that will not be printed out until WriterTest.py's connection is closed. When the connection is closed, ServiceSubcription.py prints out the remaining bytes in the buffer and also confirms that there are newlines in the data read but it is not picked up by readline as it doesn't return after encountering a newline.

CodePudding user response:

The problem is here, in your WriterTest:

data = input("Type a message\n")
   

The input function is blocking, so in an asyncio program it blocks the event loop. All tasks are stopped until you enter something. With asyncio streams, the actual transmission of the bytes occurs in another task. Your call to the input function blocks that task, which prevents the transmission. Your server doesn't respond because nothing is actually sent.

Since this is just a test program, you have a couple of quick solutions. You could put this line:

await asyncio.sleep(1.0)

after the line await writer.drain(). This will keep the other tasks running for one second, plenty of time for the data to get transmitted.

You could, of course, replace the call to input with some hard-coded string.

Better solutions can be found at the following link:

Listen to keypress with asyncio

As a general rule, input and asyncio do not play well together.

CodePudding user response:

Data is encoded alongside a break line and a couple of variables are unpacked as reader and writer objects, then meanwhile is_closing() is waiting for a closing process, that should be close(), you could try to set a conditional when message have no characters.

import asyncio
import sys

from os import linesep

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 7777)

    print(f'Send: {message!r}\n')
    writer.write(message.encode())
    await writer.drain()
    
    while not writer.is_closing():
        await asyncio.get_event_loop().run_in_executor(None, lambda s="Type a message\n": sys.stdout.write(s ' '))
        data = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline) 

        if len(data)==0: # empty data
            break

        data = (data   "\n").encode()
        writer.write(data)
        await writer.drain()

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

here I change while conditional to check instead if data is empty and use a different corountine from asyncio

async def broadcast(self):
    while True:
        data = asyncio.wait_for(self.reader.readline(), timeout=10.0) 
        if data is None or len(data.decode()) == 0:
            print("Expected message, received None")
            break
        data = data.decode().rstrip().upper()
        print(data)
  • Related