Home > Net >  Avoid Thread.Sleep when no data in queue?
Avoid Thread.Sleep when no data in queue?

Time:08-26

Currently running a windows service which creates 5 threads and running a while loop. The thing is to retrieve data from a queue and do something with it, if no data, make the thread "sleep" for X minutes. How can avoid blocking those thread? Most of the time they are idling because of no data. Pseudocode:

public static void Main()
{
    //Create consumers
    Consumer con = new Consumer();
    Consumer con2 = new Consumer();
    Thread thr = new Thread(con.doJob);
    Thread thr2 = new Thread(con2.doJob);
    thr.Start();
    thr2.Start();
}

void doJob()
{
    while(true)
    {
        //Retrieve data
        if(data) {
        //Consume it
        } else {
        //Right now using Thread.Sleep(5 minutes);
        }
    }
}

CodePudding user response:

Perhaps allowing whichever method adds data to the queue to also start the processing of the queue would work to your benefit.

In the example below, there is a list that can be added to by the AddDataToQueue method. This method will effectively add the item to the data queue, and then if the data is not already being processed it would start that as well. This would drain the queue of data to be processed, while still allowing items to be added to the list. When it is finished with the queue it goes back to a resting state, waiting for data being added to the queue to trigger the processing again.

See this code as an example:

        // this object is used to create a lock when checking the processing bool
        private static readonly object classLock = new object();

        // this flag is switched to true when processing starts
        private static bool processing = false;

        // the queue we are using
        public static List<string> DataQueue = new List<string>();
        public static async void AddDataToQueue(string data)
        {

            // add the data to the queue to be processed
            DataQueue.Add(data);


            // add a lock to the following section to ensure the bool is being checked without race conditions
            lock (classLock)
            {
                // if it's already in the midst of processing, it doesn't need to be triggered again
                if (processing)
                    return;

                // otherwise begin to process queue
                processing = true;
            }

            await Task.Run(() =>
            {
                // Process data here until queue is empty
                while (DataQueue.Count > 0)
                {
                    // deal with the first index (essentially a first in first out queue)
                    Console.WriteLine(DataQueue[0]);
                    DataQueue.RemoveAt(0);
                }

                // reset the flag
                processing = false;
            });
        }

You'll notice (as @Charlieface had mentioned in the comments), that this is more of a signal to start processing rather than sleeping and periodically checking. You'll also notice that it is an async method. This is necessary to allow it to process without interrupting the other members which may add to the list.

A benefit with this approach is that your data is processed as fast as it is received, so you aren't dealing with information that is 5 minutes old (this doesn't matter in all cases, but certainly matters in some).

CodePudding user response:

You simply need a blocking buffer. The following Ada program illustrates what you need.

with Ada.Text_IO;             use Ada.Text_IO;
with Ada.Calendar;            use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;

procedure Main is
   type Index_T is mod 10;
   type Circular_Buffer is array (Index_T) of Integer;

   protected buffer is
      entry write (Value : in Integer);
      entry Read (Value : out Integer);
   private
      Nums    : Circular_Buffer;
      W_Index : Index_T := 0;
      R_Index : Index_T := 0;
      Count   : Natural := 0;
   end buffer;

   protected body buffer is
      entry write (Value : in Integer) when Count < Nums'Length is
      begin
         Nums (W_Index) := Value;
         W_Index        := W_Index   1;
         Count          := Count   1;
      end write;

      entry read (Value : out Integer) when Count > 0 is
      begin
         Value   := Nums (R_Index);
         R_Index := R_Index   1;
         Count   := Count - 1;
      end read;
   end buffer;

   Start_Time : constant Time := Clock;

   task type Consumer (Id : Positive);

   task body Consumer is
      Number  : Integer;
      elapsed : Duration;
   begin
      loop
         buffer.Read (Number);
         exit when Number = Integer'First;
         elapsed := Clock - Start_Time;
         Put_Line
           ("Consumer" & Id'Image & " read" & Number'Image & " after " &
            Image (elapsed));
      end loop;
      Put_Line ("Consumer" & Id'Image & " terminating.");
   end Consumer;

   C1 : Consumer (1);
   C2 : Consumer (2);

begin
   for I in 1 .. 10 loop
      buffer.write (I);
      buffer.write (I);
      Put_Line
        ("Producer wrote" & I'Image & " after " & Image (Clock - Start_Time));
      delay 10.0;
   end loop;
   buffer.write (Integer'First);
   buffer.write (Integer'First);
   Put_Line ("Producer terminating.");
end Main;

The protected object named buffer has a condition associated with the read entry which causes a task calling the read entry to suspend when count = 0. In other words, the consumer tasks will suspend when the buffer is empty. In Ada suspended tasks are placed into an implicit entry queue and are given access to the object in FIFO order when the boundary condition evaluates to True, which in this case is "count > 0".

The output of this program, including elapsed time measurements is:

Producer wrote 1 after 00:00:00
Consumer 2 read 1 after 00:00:00
Consumer 1 read 1 after 00:00:00
Producer wrote 2 after 00:00:10
Consumer 1 read 2 after 00:00:10
Consumer 2 read 2 after 00:00:10
Producer wrote 3 after 00:00:20
Consumer 2 read 3 after 00:00:20
Consumer 1 read 3 after 00:00:20
Producer wrote 4 after 00:00:30
Consumer 1 read 4 after 00:00:30
Consumer 2 read 4 after 00:00:30
Consumer 1 read 5 after 00:00:40
Consumer 2 read 5 after 00:00:40
Producer wrote 5 after 00:00:40
Producer wrote 6 after 00:00:50
Consumer 1 read 6 after 00:00:50
Consumer 2 read 6 after 00:00:50
Consumer 1 read 7 after 00:01:00
Consumer 2 read 7 after 00:01:00
Producer wrote 7 after 00:01:00
Consumer 1 read 8 after 00:01:10
Producer wrote 8 after 00:01:10
Consumer 2 read 8 after 00:01:10
Producer wrote 9 after 00:01:20
Consumer 2 read 9 after 00:01:20
Consumer 1 read 9 after 00:01:20
Producer wrote 10 after 00:01:30
Consumer 1 read 10 after 00:01:30
Consumer 2 read 10 after 00:01:30
Producer terminating.
Consumer 1 terminating.
Consumer 2 terminating.

The main task acts as the producer. It iterates through a loop 10 times. Each time it writes a value to the buffer twice and then delays (sleeps) for 10 seconds. The consumer tasks terminate when they read the value Integer'First (which is the most negative value of type Integer) from the buffer.

  • Related