Say I have a generator that slowly creates my stream of data:
import threading
import time
class SlowStreamSource():
def __init__(self):
self.buffer = ""
self.gen_data = threading.Thread(target=self.generate_stream)
self.gen_data.start()
def generate_stream(self):
i = 0
while i < 10:
self.buffer = str(i)
# Other processing happens
time.sleep(0.1)
i = 1
self.buffer = "-Stream Finished-"
def read(self, hint = -1):
if hint is None or hint < 0:
result = self.buffer
self.buffer = ""
else:
result = self.buffer[:hint]
result = self.buffer[hint:]
return result
This data is sent to a consumer that is much faster than the generator and follows the standard practice of invoking read()
until there is no more data and exiting
import time
class FastStreamDestination():
def __init__(self, source):
self.source = source
def process_stream(self):
while True:
data = self.source.read()
if not data:
break
print(f'read "{data}"')
# Other processing happens
time.sleep(0.05)
(I have no control over the consumer. It's Amazon's boto3 upload_fileobj, but I have reviewed their code to determine that this is essentially how it functions.)
When I feed my generator into my consumer, it very quickly depletes the buffer, concludes that the stream is finished and exits prematurely.
src = SlowStreamSource()
dst = FastStreamDestination(src)
dst.process_stream()
yields read "0"
, but I ultimately need something like
read "0"
read "1"
read "2"
read "3"
read "4"
read "5"
read "6"
read "7"
read "8"
read "9"
read "-Stream Finished-"
Is there anyway to ensure my consumer reads the entire stream from my generator, keeping in mind that I cannot meaningfully speed up the generator, nor can I modify the consumer in any way?
CodePudding user response:
Ok, with some help from a co-worker, I think I have the solution.
My generator can be aware of if there is more data for it to provide, even if it doesn't have the data ready yet. Since it's a File-like object, that means it has a close function that can be invoked when I'm sure the data is all generated.
With that awareness, I can make the read function block as long as it needs in order to ensure that it has some data to return.
import threading
import time
class SlowStreamSource():
def __init__(self):
self.buffer = ""
self.done = False
self.gen_data = threading.Thread(target=self.generate_stream)
self.gen_data.start()
self.closed = False
def generate_stream(self):
i = 0
while i < 10:
self.buffer = str(i)
# Other processing happens
time.sleep(0.1)
i = 1
self.buffer = "-Stream Finished-"
self.closed = True
def read(self, hint = -1):
while not self.closed and len(self.buffer) == 0:
time.sleep(0.1)
if hint is None or hint < 0:
result = self.buffer
self.buffer = ""
else:
result = self.buffer[:hint]
result = self.buffer[hint:]
return result