Home > Mobile >  Python: Slow generator streaming into fast consumer depletes buffer and terminates early
Python: Slow generator streaming into fast consumer depletes buffer and terminates early

Time:08-30

Say I have a generator that slowly creates my stream of data:

import threading
import time

class SlowStreamSource():
  def __init__(self):
    self.buffer = ""
    self.gen_data = threading.Thread(target=self.generate_stream)
    self.gen_data.start()

  def generate_stream(self):
    i = 0
    while i < 10:
      self.buffer  = str(i)
      # Other processing happens
      time.sleep(0.1)
      i  = 1
    self.buffer  = "-Stream Finished-"

  def read(self, hint = -1):
    if hint is None or hint < 0:
      result = self.buffer
      self.buffer = ""
    else:
      result = self.buffer[:hint]
      result = self.buffer[hint:]
    return result

This data is sent to a consumer that is much faster than the generator and follows the standard practice of invoking read() until there is no more data and exiting

import time
class FastStreamDestination():
  def __init__(self, source):
    self.source = source

  def process_stream(self):
    while True:
      data = self.source.read()
      if not data:
        break
      print(f'read "{data}"')
      # Other processing happens
      time.sleep(0.05)

(I have no control over the consumer. It's Amazon's boto3 upload_fileobj, but I have reviewed their code to determine that this is essentially how it functions.)

When I feed my generator into my consumer, it very quickly depletes the buffer, concludes that the stream is finished and exits prematurely.

src = SlowStreamSource()
dst = FastStreamDestination(src)
dst.process_stream()

yields read "0", but I ultimately need something like

read "0"
read "1"
read "2"
read "3"
read "4"
read "5"
read "6"
read "7"
read "8"
read "9"
read "-Stream Finished-"

Is there anyway to ensure my consumer reads the entire stream from my generator, keeping in mind that I cannot meaningfully speed up the generator, nor can I modify the consumer in any way?

CodePudding user response:

Ok, with some help from a co-worker, I think I have the solution.

My generator can be aware of if there is more data for it to provide, even if it doesn't have the data ready yet. Since it's a File-like object, that means it has a close function that can be invoked when I'm sure the data is all generated.

With that awareness, I can make the read function block as long as it needs in order to ensure that it has some data to return.

import threading
import time

class SlowStreamSource():
  def __init__(self):
    self.buffer = ""
    self.done = False
    self.gen_data = threading.Thread(target=self.generate_stream)
    self.gen_data.start()
    self.closed = False

  def generate_stream(self):
    i = 0
    while i < 10:
      self.buffer  = str(i)
      # Other processing happens
      time.sleep(0.1)
      i  = 1
    self.buffer  = "-Stream Finished-"
    self.closed = True

  def read(self, hint = -1):
    while not self.closed and len(self.buffer) == 0:
      time.sleep(0.1)
    if hint is None or hint < 0:
      result = self.buffer
      self.buffer = ""
    else:
      result = self.buffer[:hint]
      result = self.buffer[hint:]
    return result
  • Related