I am tring to copy the upcoming datas to another queue.Queue() to do other stuffs in another thread.
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
and the config lines here
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
# connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
rgb_channel.stop_consuming()
sys.exit(0)
and finally the queue.Queue().get() function that i failed at:
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
print("POINTER 1")
try:
rgb_frame = READ_QUEUE.get(block=True)
except queue.Empty:
rgb_frame = None
if not rgb_frame:
continue
print("POINTER 2")
it is stucking there. I am new at threading and queue architecture. I am triying add_callbak_threadsafe()
and I know that get() blocks the thread. But i created 2 different thread as here
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
So if i created 2 threads why queue.Queue().get() blocks the other one. Thanks for your helps. I can share whole code it is really simple and almost 170 lines.
CodePudding user response:
Here I solved the issue and i want to publish for the ones who are trying to put data to rabbitmq queue and the read by consumer and then put it to python queue and do some stuffs on other threads. I hope it will help someones.
# RGB CONSUME #
import numpy as np
import pika
import sys
import cv2
import queue
import threading
# MACRO DEFINITIONS #
RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False
# PARAMETER CHECK #
# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
if sys.argv[1] == '-display':
STATUS = False
STATUS2 = True
else:
print("Gecersiz parametre")
exit(1)
else:
pass
# LOCAL FUNCTIONS #
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
print(rgb_color_bytes)
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
if STATUS:
cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)
if STATUS or STATUS2:
print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL C ***')
else:
print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL C ***')
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
print('Interrupted ^^ Channel Kapatildi')
rgb_channel.stop_consuming()
sys.exit(0)
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
while True:
rgb_frame = READ_QUEUE.get(block=True)
# 640 * 480
if rgb_frame.size == 921600:
rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
# 1280 * 720
elif rgb_frame.size == 2764800:
rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
# 1920 * 1080
elif rgb_frame.size == 6220800:
rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
else:
print("Something wrong i can feel it")
exit(1)
if STATUS2:
cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
cv2.waitKey(1)
try:
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
except KeyboardInterrupt:
print('Interrupted')
cv2.destroyAllWindows()
sys.exit(0)