I have a producer-consumer scenario¹ based on a bounded Channel<T>
.
Channel<Item> channel = Channel.CreateBounded<Item>(10);
The items are coming from an RDBMS, to which the producer connects and fetches them one by one. The peculiarity is that I don't have the luxury of keeping the DB connection alive for the whole lifetime of the producer. I have to close the connection when the channel is full, and reopen the connection when the channel has again space available for a new item. So I implemented the producer like this:
// Producer
while (true)
{
await channel.Writer.WaitToWriteAsync();
connection.Open();
Item item;
while (true)
{
item = connection.GetNextItem(); // This is fast
if (!channel.Writer.TryWrite(item)) break;
}
connection.Close();
await channel.Writer.WriteAsync(item);
}
The producer waits until the channel.Writer.WaitToWriteAsync()
task completes, then opens the DB connection, writes in the channel as many items as it can until one is rejected, closes the DB connection, writes asynchronously the rejected item, and loops back to the waiting.
The consumer is pretty standard:
// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
// Process the item (this is slow)
}
My problem with this design is that the DB connection is opened and closed too often. Both opening and closing the connection has a non-negligible overhead, so I would like to minimize how frequently it happens. Although the capacity of the channel is 10, I would prefer if the WaitToWriteAsync
task was completing when the channel is half-full (5 items), not immediately when the stored items drop from 10 to 9.
My question is: How can I modify my producer, so that it connects to the database when there are 5 or less items in the channel, and closes the connection when the channel is full with 10 items?
Below is the output from a minimal example that I wrote, that reproduces the undesirable behavior:
19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished
As you can see there is a lot of "Opening/Closing connection" going on.
My question has similarities with this older question:
Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full.
The difference is that in my case the producer is a loop, and not the event handler of a service, as in the other question.
¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.
Clarification: The channel should not be drained completely before reconnecting to the DB. That's because opening the connection takes some time, and I don't want the consumer to be idle during this time. So the producer should reconnect when the channel has dropped to 5 items or less, not when it is completely empty.
CodePudding user response:
This problem is analogous to I/O buffering. Try simply enlarging the size of your bounded channel and start writing to the channel when it is empty then stop writing to the channel when it is full or there is no more data to write. In this manner the producer start-stop cycles are bounded by channel empty/full conditions. Your producer should not be trying to pace every couple of consumer actions on the channel.
The following example is written using the Ada programming language in response to the author's request for an example in the language of my choice.
This example creates a producer task and a consumer task which communicate via a shared buffer. The producer starts producing when the shared buffer is full and stops when the shared buffer is empty. The producer is much faster than the consumer.
The Ada package specification for this example, which exposes the producer and consumer tasks to the main procedure is:
package prod_con is
task fast_producer;
task slow_consumer;
end prod_con;
The implementation of these tasks and the shared buffer is located in the package body.
with Ada.Text_IO; use Ada.Text_IO;
package body prod_con is
type Buf_Index is range 0 .. 10;
type Buf_Arr is array (Buf_Index) of Integer;
protected Buffer is
entry Write (Value : in Integer);
entry Read (Value : out Integer);
private
Write_Allowed : Boolean := True;
Buff : Buf_Arr;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
end Buffer;
protected body Buffer is
entry Write (Value : in Integer) when Write_Allowed is
begin
Buff (Write_Index) := Value;
Count := Count 1;
if Write_Index < Buf_Index'Last then
Write_Index := Write_Index 1;
else
Write_Index := 0;
Write_Allowed := False;
end if;
end Write;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buff (Read_Index);
Count := Count - 1;
if Read_Index < Buf_Index'Last then
Read_Index := Read_Index 1;
else
Write_Allowed := True;
Read_Index := 0;
end if;
end Read;
end Buffer;
-------------------
-- fast_producer --
-------------------
task body fast_producer is
begin
for I in 1 .. 22 loop
Buffer.Write (I);
Put_Line ("Producer produced value:" & I'Image);
end loop;
Put_Line ("Producer done producing.");
end fast_producer;
-------------------
-- slow_consumer --
-------------------
task body slow_consumer is
Next_Value : Integer;
begin
loop
select
Buffer.Read (Next_Value);
Put_Line ("Consumer read value:" & Next_Value'Image);
delay 1.0; -- sleep for one second
or
delay 0.1;
exit;
end select;
end loop;
Put_Line ("Consumer done consuming.");
end slow_consumer;
end prod_con;
with Ada.Text_IO; use Ada.Text_IO;
package body prod_con is
type Buf_Index is range 0 .. 10;
type Buf_Arr is array (Buf_Index) of Integer;
protected Buffer is
entry Write (Value : in Integer);
entry Read (Value : out Integer);
private
Write_Allowed : Boolean := True;
Buff : Buf_Arr;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
end Buffer;
protected body Buffer is
entry Write (Value : in Integer) when Write_Allowed is
begin
Buff (Write_Index) := Value;
Count := Count 1;
if Write_Index < Buf_Index'Last then
Write_Index := Write_Index 1;
else
Write_Index := 0;
Write_Allowed := False;
end if;
end Write;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buff (Read_Index);
Count := Count - 1;
if Read_Index < Buf_Index'Last then
Read_Index := Read_Index 1;
else
Write_Allowed := True;
Read_Index := 0;
end if;
end Read;
end Buffer;
-------------------
-- fast_producer --
-------------------
task body fast_producer is
begin
for I in 1 .. 22 loop
Buffer.Write (I);
Put_Line ("Producer produced value:" & I'Image);
end loop;
Put_Line ("Producer done producing.");
end fast_producer;
-------------------
-- slow_consumer --
-------------------
task body slow_consumer is
Next_Value : Integer;
begin
loop
select
Buffer.Read (Next_Value);
Put_Line ("Consumer read value:" & Next_Value'Image);
delay 1.0; -- sleep for one second
or
delay 0.1;
exit;
end select;
end loop;
Put_Line ("Consumer done consuming.");
end slow_consumer;
end prod_con;
The Ada protected object named Buffer is protected against race conditions. This example uses to protected entries. Protected entries allow read and write access to the protected object under control of a boundary condition. The Write entry is allowed to execute when Write_Allowed is true. Every time the Write entry is executed it assigns the Value parameter to the internal array named Buff at the index value contained in Write_Index and increments the Count of values in the Buff array.
When Write_Index is less than the maximum index value then Write_Index is incremented. If Write_Index is not less than the maximum index value then Write_Index is assigned 0 and Write_Allowed is assigned False. The buffer is full and the producer will be suspended in the Write entry queue on its next call to Write until Write_Allowed evaluates to True.
The Read entry is used by the consumer to read the next value from Buffer whenever Buffer is not empty.
Each call to the Read entry assigns the value of Buff at the index Read_Index to the parameter Value. Value is passed out to the consumer task. Count is decremented. Read_Index is incremented if its current value is less than the maximum array index value. If Read_Index is not less than the maximum array index value the Read_Index is set to 0 and Write_Allowed is set to True.
The Producer task simply writes the value 1 through 22 to Buffer as fast as it can. Producer will be suspended while the Write entry boundary condition is False and will automatically resume execution when Write_Allowed is true.
The Consumer loops through reading from Buffer and then sleeping one second to simulate a slow consumer. The call to Buffer.Read is placed in a select command, which performs a conditional call to the Buffer.Read entry. The condition set states that the call to Buffer.Read will be canceled if it is not completed within 0.1 seconds. When the Buffer.Read is canceled the consumer completes.
The output of this example is:
Producer produced value: 1
Producer produced value: 2
Producer produced value: 3
Producer produced value: 4
Producer produced value: 5
Producer produced value: 6
Producer produced value: 7
Producer produced value: 8
Producer produced value: 9
Producer produced value: 10
Producer produced value: 11
Consumer read value: 1
Consumer read value: 2
Consumer read value: 3
Consumer read value: 4
Consumer read value: 5
Consumer read value: 6
Consumer read value: 7
Consumer read value: 8
Consumer read value: 9
Consumer read value: 10
Consumer read value: 11
Producer produced value: 12
Producer produced value: 13
Producer produced value: 14
Producer produced value: 15
Producer produced value: 16
Producer produced value: 17
Producer produced value: 18
Producer produced value: 19
Producer produced value: 20
Producer produced value: 21
Producer produced value: 22
Producer done producing.
Consumer read value: 12
Consumer read value: 13
Consumer read value: 14
Consumer read value: 15
Consumer read value: 16
Consumer read value: 17
Consumer read value: 18
Consumer read value: 19
Consumer read value: 20
Consumer read value: 21
Consumer read value: 22
Consumer done consuming.
CodePudding user response:
One way to solve this problem is to use two channels, a bounded Channel<T>
with the desirable capacity, and a second bounded Channel<int>
with capacity 1 that is used only for the WaitToWriteAsync
functionality. Synchronizing the two channels is not trivial, so I wrote a custom Channel<T>
implementation that wraps these two channels, and does the synchronization internally:
public sealed class DoubleCapacityChannel<T> : Channel<T>
{
private readonly Channel<T> _channel;
private readonly Channel<int> _channelLow;
private readonly int _lowCapacity;
public DoubleCapacityChannel(int highCapacity, int lowCapacity)
{
if (highCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(highCapacity));
if (lowCapacity < 1 || lowCapacity > highCapacity)
throw new ArgumentOutOfRangeException(nameof(lowCapacity));
_lowCapacity = lowCapacity;
_channel = Channel.CreateBounded<T>(highCapacity);
Debug.Assert(_channel.Reader.CanCount);
_channelLow = Channel.CreateBounded<int>(1);
this.Writer = new ChannelWriter(this);
this.Reader = new ChannelReader(this);
}
private class ChannelWriter : ChannelWriter<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelWriter(DoubleCapacityChannel<T> parent) => _parent = parent;
public override bool TryComplete(Exception error = null)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryComplete(error);
if (success) _parent._channelLow.Writer.TryComplete(error);
return success;
}
}
public override bool TryWrite(T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryWrite(item);
if (!success || _parent._channel.Reader.Count >= _parent._lowCapacity)
_parent._channelLow.Writer.TryWrite(0);
return success;
}
}
public override async ValueTask WriteAsync(T item,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
while (true)
{
if (this.TryWrite(item)) break;
if (!await _parent._channel.Writer
.WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
throw new ChannelClosedException();
}
}
public override ValueTask<bool> WaitToWriteAsync(
CancellationToken cancellationToken = default)
=> _parent._channelLow.Writer.WaitToWriteAsync(cancellationToken);
}
private class ChannelReader : ChannelReader<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelReader(DoubleCapacityChannel<T> parent) => _parent = parent;
public override Task Completion => _parent._channel.Reader.Completion;
public override bool CanCount => _parent._channel.Reader.CanCount;
public override int Count => _parent._channel.Reader.Count;
public override bool TryRead(out T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Reader.TryRead(out item);
if (!success || _parent._channel.Reader.Count < _parent._lowCapacity)
_parent._channelLow.Reader.TryRead(out _);
return success;
}
}
public override bool CanPeek => _parent._channel.Reader.CanPeek;
public override bool TryPeek(out T item)
=> _parent._channel.Reader.TryPeek(out item);
public override ValueTask<bool> WaitToReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.WaitToReadAsync(cancellationToken);
}
}
The DoubleCapacityChannel<T>
class can solve the problem in the question by changing the line:
Channel<Item> channel = Channel.CreateBounded<Item>(10);
to
Channel<Item> channel = new DoubleCapacityChannel<Item>(10, 5);
Below is the output from the original minimal example, modified to use the DoubleCapacityChannel<T>
:
19:26:19.119 [4] > Opening connection -->
19:26:19.241 [4] > Produced #1
19:26:19.243 [4] > Produced #2
19:26:19.243 [4] > Produced #3
19:26:19.243 [4] > Produced #4
19:26:19.243 [4] > Produced #5
19:26:19.243 [4] > Produced #6
19:26:19.243 [4] > Produced #7
19:26:19.243 [4] > Produced #8
19:26:19.243 [4] > Produced #9
19:26:19.243 [4] > Produced #10
19:26:19.243 [4] > Produced #11
19:26:19.243 [4] > Closing connection <--
19:26:19.244 [6] > Consuming: 1
19:26:19.345 [6] > Consuming: 2
19:26:19.446 [6] > Consuming: 3
19:26:19.547 [6] > Consuming: 4
19:26:19.651 [6] > Consuming: 5
19:26:19.752 [6] > Consuming: 6
19:26:19.852 [6] > Consuming: 7
19:26:19.853 [4] > Opening connection -->
19:26:19.953 [6] > Consuming: 8
19:26:19.953 [4] > Produced #12
19:26:19.953 [4] > Produced #13
19:26:19.953 [4] > Produced #14
19:26:19.953 [4] > Produced #15
19:26:19.953 [4] > Produced #16
19:26:19.953 [4] > Produced #17
19:26:19.953 [4] > Produced #18
19:26:19.953 [4] > Produced #19
19:26:19.953 [4] > Closing connection <--
19:26:20.053 [6] > Consuming: 9
19:26:20.154 [4] > Consuming: 10
19:26:20.254 [4] > Consuming: 11
19:26:20.355 [4] > Consuming: 12
19:26:20.455 [4] > Consuming: 13
19:26:20.556 [4] > Consuming: 14
19:26:20.656 [4] > Consuming: 15
19:26:20.656 [6] > Opening connection -->
19:26:20.757 [6] > Produced #20
19:26:20.757 [6] > Produced #21
19:26:20.757 [6] > Produced #22
19:26:20.757 [6] > Produced #23
19:26:20.757 [6] > Produced #24
19:26:20.757 [6] > Produced #25
19:26:20.757 [6] > Produced #26
19:26:20.757 [6] > Produced #27
19:26:20.757 [6] > Closing connection <--
19:26:20.757 [4] > Consuming: 16
19:26:20.858 [4] > Consuming: 17
19:26:20.859 [6] > Producer completed
19:26:20.959 [6] > Consuming: 18
19:26:21.059 [6] > Consuming: 19
19:26:21.160 [6] > Consuming: 20
19:26:21.260 [6] > Consuming: 21
19:26:21.361 [6] > Consuming: 22
19:26:21.461 [6] > Consuming: 23
19:26:21.562 [6] > Consuming: 24
19:26:21.662 [6] > Consuming: 25
19:26:21.763 [6] > Consuming: 26
19:26:21.863 [7] > Consuming: 27
19:26:21.964 [1] > Finished