Currently running a windows service which creates 5 threads and running a while loop. The thing is to retrieve data from a queue and do something with it, if no data, make the thread "sleep" for X minutes. How can avoid blocking those thread? Most of the time they are idling because of no data. Pseudocode:
public static void Main()
{
//Create consumers
Consumer con = new Consumer();
Consumer con2 = new Consumer();
Thread thr = new Thread(con.doJob);
Thread thr2 = new Thread(con2.doJob);
thr.Start();
thr2.Start();
}
void doJob()
{
while(true)
{
//Retrieve data
if(data) {
//Consume it
} else {
//Right now using Thread.Sleep(5 minutes);
}
}
}
CodePudding user response:
Perhaps allowing whichever method adds data to the queue to also start the processing of the queue would work to your benefit.
In the example below, there is a list that can be added to by the AddDataToQueue
method. This method will effectively add the item to the data queue, and then if the data is not already being processed it would start that as well. This would drain the queue of data to be processed, while still allowing items to be added to the list. When it is finished with the queue it goes back to a resting state, waiting for data being added to the queue to trigger the processing again.
See this code as an example:
// this object is used to create a lock when checking the processing bool
private static readonly object classLock = new object();
// this flag is switched to true when processing starts
private static bool processing = false;
// the queue we are using
public static List<string> DataQueue = new List<string>();
public static async void AddDataToQueue(string data)
{
// add the data to the queue to be processed
DataQueue.Add(data);
// add a lock to the following section to ensure the bool is being checked without race conditions
lock (classLock)
{
// if it's already in the midst of processing, it doesn't need to be triggered again
if (processing)
return;
// otherwise begin to process queue
processing = true;
}
await Task.Run(() =>
{
// Process data here until queue is empty
while (DataQueue.Count > 0)
{
// deal with the first index (essentially a first in first out queue)
Console.WriteLine(DataQueue[0]);
DataQueue.RemoveAt(0);
}
// reset the flag
processing = false;
});
}
You'll notice (as @Charlieface had mentioned in the comments), that this is more of a signal to start processing rather than sleeping and periodically checking. You'll also notice that it is an async
method. This is necessary to allow it to process without interrupting the other members which may add to the list.
A benefit with this approach is that your data is processed as fast as it is received, so you aren't dealing with information that is 5 minutes old (this doesn't matter in all cases, but certainly matters in some).
CodePudding user response:
You simply need a blocking buffer. The following Ada program illustrates what you need.
with Ada.Text_IO; use Ada.Text_IO;
with Ada.Calendar; use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;
procedure Main is
type Index_T is mod 10;
type Circular_Buffer is array (Index_T) of Integer;
protected buffer is
entry write (Value : in Integer);
entry Read (Value : out Integer);
private
Nums : Circular_Buffer;
W_Index : Index_T := 0;
R_Index : Index_T := 0;
Count : Natural := 0;
end buffer;
protected body buffer is
entry write (Value : in Integer) when Count < Nums'Length is
begin
Nums (W_Index) := Value;
W_Index := W_Index 1;
Count := Count 1;
end write;
entry read (Value : out Integer) when Count > 0 is
begin
Value := Nums (R_Index);
R_Index := R_Index 1;
Count := Count - 1;
end read;
end buffer;
Start_Time : constant Time := Clock;
task type Consumer (Id : Positive);
task body Consumer is
Number : Integer;
elapsed : Duration;
begin
loop
buffer.Read (Number);
exit when Number = Integer'First;
elapsed := Clock - Start_Time;
Put_Line
("Consumer" & Id'Image & " read" & Number'Image & " after " &
Image (elapsed));
end loop;
Put_Line ("Consumer" & Id'Image & " terminating.");
end Consumer;
C1 : Consumer (1);
C2 : Consumer (2);
begin
for I in 1 .. 10 loop
buffer.write (I);
buffer.write (I);
Put_Line
("Producer wrote" & I'Image & " after " & Image (Clock - Start_Time));
delay 10.0;
end loop;
buffer.write (Integer'First);
buffer.write (Integer'First);
Put_Line ("Producer terminating.");
end Main;
The protected object named buffer has a condition associated with the read entry which causes a task calling the read entry to suspend when count = 0. In other words, the consumer tasks will suspend when the buffer is empty. In Ada suspended tasks are placed into an implicit entry queue and are given access to the object in FIFO order when the boundary condition evaluates to True, which in this case is "count > 0".
The output of this program, including elapsed time measurements is:
Producer wrote 1 after 00:00:00
Consumer 2 read 1 after 00:00:00
Consumer 1 read 1 after 00:00:00
Producer wrote 2 after 00:00:10
Consumer 1 read 2 after 00:00:10
Consumer 2 read 2 after 00:00:10
Producer wrote 3 after 00:00:20
Consumer 2 read 3 after 00:00:20
Consumer 1 read 3 after 00:00:20
Producer wrote 4 after 00:00:30
Consumer 1 read 4 after 00:00:30
Consumer 2 read 4 after 00:00:30
Consumer 1 read 5 after 00:00:40
Consumer 2 read 5 after 00:00:40
Producer wrote 5 after 00:00:40
Producer wrote 6 after 00:00:50
Consumer 1 read 6 after 00:00:50
Consumer 2 read 6 after 00:00:50
Consumer 1 read 7 after 00:01:00
Consumer 2 read 7 after 00:01:00
Producer wrote 7 after 00:01:00
Consumer 1 read 8 after 00:01:10
Producer wrote 8 after 00:01:10
Consumer 2 read 8 after 00:01:10
Producer wrote 9 after 00:01:20
Consumer 2 read 9 after 00:01:20
Consumer 1 read 9 after 00:01:20
Producer wrote 10 after 00:01:30
Consumer 1 read 10 after 00:01:30
Consumer 2 read 10 after 00:01:30
Producer terminating.
Consumer 1 terminating.
Consumer 2 terminating.
The main task acts as the producer. It iterates through a loop 10 times. Each time it writes a value to the buffer twice and then delays (sleeps) for 10 seconds. The consumer tasks terminate when they read the value Integer'First (which is the most negative value of type Integer) from the buffer.