I'm writing a simple producer/consumer application, but I'm noticing a really strange behaviour..This is the code:
private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);
static void Main(string[] args)
{
// Start consumer thread
Consume();
// Produce
var t = new Thread(() =>
{
while (true)
{
var packet = RtpPacket.GetNext();
_queue.Post(packet);
Thread.Sleep(70);
}
}
t.Join();
}
static void Consume()
{
_timelineThread = new Thread(async () =>
{
while (_stopping.WaitOne(0) == false)
{
// Start consuming...
while (await _queue.OutputAvailableAsync())
{
var packet = await _queue.ReceiveAsync();
// Some processing...
}
}
});
_timelineThread.Start();
}
This is intended to be an infinite loop (until I route the _stopping signal). But, when _timelineThread hits the first await _queue.OutputAvailableAsync(), the thread changes state to 'Stopped'. There is something wrong that I'm not considering ?
If I change the Consume() function to this:
static void Consume()
{
_timelineThread = new Thread(() =>
{
while (_stopping.WaitOne(0) == false)
{
// Start consuming...
while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
{
var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
// Some processing...
}
}
});
_timelineThread.Start();
}
the thread runs without any problem..but the code is almost identical to the previous one..
EDIT: after one hour also this 'hack' doesn't seems to work..thread is 'Running' but I don't receive any data from the queue..
CodePudding user response:
The Thread
constructor does not understand async
delegates. You can read about this here:
My suggestion is to use a synchronous BlockingCollection<RtpPacket>
instead of the BufferBlock<RtpPacket>
, and consume it by enumerating the GetConsumingEnumerable
method:
var _queue = new BlockingCollection<RtpPacket>();
var producer = new Thread(() =>
{
while (true)
{
var packet = RtpPacket.GetNext();
if (packet == null) { _queue.CompleteAdding(); break; }
_queue.Add(packet);
Thread.Sleep(70);
}
});
var consumer = new Thread(() =>
{
foreach (var packet in _queue.GetConsumingEnumerable())
{
// Some processing...
}
});
producer.Start();
consumer.Start();
producer.Join();
consumer.Join();