I will say in advance that huge speed is needed and calling ExecutePackets
is very expensive.
Necessary that the ExecutePackets
function process many packages in parallel from different threads.
struct Packet {
bool responseStatus;
char data[1024];
};
struct PacketPool {
int packet_count;
Packet* packets[10];
}packet_pool;
std::mutex queue_mtx;
std::mutex request_mtx;
bool ParallelExecutePacket(Packet* p_packet) {
p_packet->responseStatus = false;
struct QueuePacket {
bool executed;
Packet* p_packet;
}queue_packet{ false, p_packet };
static std::list<std::reference_wrapper<QueuePacket>> queue;
//make queue
queue_mtx.lock();
queue.push_back(queue_packet);
queue_mtx.unlock();
request_mtx.lock();
if (!queue_packet.executed)
{
ZeroMemory(&packet_pool, sizeof(packet_pool));
//move queue to pequest_pool and clear queue
queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
if (!(*iter).get().executed)
{
int current_count = packet_pool.packet_count ;
packet_pool.packets[current_count] = (*iter).get().p_packet;
(*iter).get().executed = true;
queue.erase(iter );
}
else iter;
queue_mtx.unlock();
//execute packets
ExecutePackets(&packet_pool);
}
request_mtx.unlock();
return p_packet->responseStatus;
}
The ParallelExecutePacket
function can be called from multiple loops at the same time. I want packets to be processed in batches of several. More precisely, so that each thread processes the entire queue. Then the number of ExecutePackets
will be reduced, while not losing the number of processed packets.
However, in my code with multiple threads, the total number of packets processed is equal to the number of packets processed by one thread. And I don't understand why this is happening.
In my test, I created several threads and in each thread called ParallelExecutePacket in a loop. The results are the number of processed requests per second.
Multithread:
Summ:91902 Thread 0 : 20826 Thread 1 : 40031 Thread 2 : 6057 Thread 3 : 12769 Thread 4 : 12219
Singlethread:
Summ:104902 Thread 0 : 104902
And if my version is not working,how implement what i need?
CodePudding user response:
queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
queue.erase(iter );
queue_mtx.unlock();
Only one execution thread locks the queue at a time, drains all messages from it, and then unlocks it. Even if a thousand execution threads are available here only one of them will be able to do any work. All others get blocked.
The length of time the queue_mtx
is held must be minimized as much as possible, it should be no more than the absoulte minimum it takes to pluck one messages out of the queue, removing it completely, then unlocking the queue while all the real work is done.
int current_count = packet_pool.packet_count ;
packet_pool.packets[current_count] = (*iter).get().p_packet;
This appears to be the extent of the work that's done here. Currently the shown code enjoys the benefit of being protected by the queue_mtx
. If this is no longer protected by it, any more, then thread safety must be implemented here in some other way, if that's needed (it's unclear what any of this is, and whether there's a thread synchronization issue here, at all).
CodePudding user response:
You never drop request_mtx
during the while loop. That while loop includes ExecutePackets
, so your thread blocks all of the others until it completes executing all the tasks it finds.
Also note that you wont actually see any speed ups from this style of parallelism. To have n
threads of parallelism with this code, you need to have n
callers calling into ParallelExecutePacket
. This is exactly the same parallelism that would happen if you just let each one work on its own. Indeed, statistically speaking you will find that almost always every thread just runs its own task. Every now and then you'll get a threading contention which causes one thread to execute another's task. When this occurs, both threads slow down to the slower of the two.