I’m trying to parse the ts stream data coming from sockets with 4 threads. I’ve decided to use boost shared mutex to manage connections and data receiving. But I’m totally newbie in c and I’m not sure if I’ll do it right with tread safety. I’m using boost unordered_map<int, TsStreams>, when a new user is connecting, I’m locking the mutex with a unique lock and adding a user to the map, when this user is disconnecting, I’m locking the mutex with a unique lock and remove him from the map. TsStreams structure contains the vector and some additional variables while the user sending the data, I’m using the shared lock to get the user’s TsStreams reference from the map, add new data to the vector and modify additional variables. Is modifying TsStreams in that way thread-safe or not?
class Demuxer {
public:
Demuxer();
typedef signal<void (int, TsStream)> PacketSignal;
void onUserConnected(User);
void onUserDisconnected(int);
void onUserData(Data);
void addPacketSignal(const PacketSignal::slot_type& slot);
private:
mutable PacketSignal packetSignal;
void onPacketReady(int, TsStream);
TsDemuxer tsDemuxer;
boost::unordered_map<int, TsStreams> usersData;
boost::shared_mutex mtx_;
};
#include "Demuxer.h"
Demuxer::Demuxer() {
tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}
void Demuxer::onUserConnected(User user){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(user.socket)){
usersData.erase(user.socket);
}
TsStreams streams;
streams.video.isVideo = true;
usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(socket)){
usersData.erase(socket);
}
}
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> lock(mtx_);
if(!usersData.count(data.socket)){
return;
}
tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
}
void Demuxer::onPacketReady(int socket, TsStream data) {
packetSignal(socket, data);
}
void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
packetSignal.connect(slot);
}
struct TsStreams{
TsStreams() = default;
TsStreams(const TsStreams &p1) {}
TsStream video;
TsStream audio;
};
struct TsStream
{
TsStream() = default;
TsStream(const TsStream &p1) {}
boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
uint64_t PTS = 0;
uint64_t DTS = 0;
std::vector<char> buffer;
uint32_t bytesDataLength = 0;
bool isVideo = false;
};
class TsDemuxer {
public:
typedef signal<void (int, TsStream)> PacketSignal;
void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
PacketSignal packetSignal;
void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
void parseAdaptationField(BitReader &bitReader);
void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
void parsePES(TsStream &stream, BitReader &bitReader);
int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
if(video){
streams.video.mtx_.lock();
parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
}else{
streams.audio.mtx_.lock();
parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
}
}
void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
//some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}
void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {
if(payload_unit_start_indicator)
{
if(!stream.buffer.empty()){
packetSignal(socket, stream);
stream.buffer = vector<char>();
stream.bytesDataLength = 0;
}
parsePES(stream, bitReader);
}
size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;
copy(bitReader.getBitReaderData(), bitReader.getBitReaderData() payloadSizeBytes,back_inserter(stream.buffer));
stream.mtx_.unlock();
}
CodePudding user response:
The demuxer looks correct to me. There are a few inefficiencies though:
You don't need to
count
before youerase
. Just erase. If an element is not present, this will do nothing. That saves you one lookup. Likewise, don't usecount
followed byat
. Usefind
(see below for the use).You may want to move as much work as possible out of the critical section. Foe example in
onUserConnected
you could create the TsStreams object before acquiring the lock.Note that changing an unordered map will never invalidate pointers or references to elements in the map unless they are erased. That means in
onUserData
you don't have to hold the lock on the map while parsing the packet.
That is, assuming you don't call onUserData for the same user from two different threads. You could prevent this by introducing a second lock the TsStream object. Likewise, you should guard against erasing the element while another thread may still parse the last packet. I would use a shared_ptr
for this. Something like this:
class Demuxer {
...
boost::unordered_map<int, boost::shared_ptr<TsStreams> > usersData;
boost::shared_mutex mtx_;
};
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> maplock(mtx_);
auto found = usersData.find(data.socket);
if(found == usersData.end())
return;
boost::shared_ptr<TsStreams> stream = found->second;
boost::unique_lock<boost::recursive_mutex> datalock(stream->mtx_);
maplock.unlock();
tsDemuxer.parsePacket(data.socket, *stream, (uint8_t *) data.buffer, data.length);
}
If you reduce the time the Demuxer lock is taken with this approach, you should probably replace that shared mutex with a normal one. shared mutexes have much higher overhead and are not worth it for such short critical sections.
The TsDemuxer looks a bit wonky:
In TsDemuxer::parsePacket
you never unlock the mutex. Shouldn't that be a unique_lock
? Likewise, in parseStream
the unlock seems unpaired. In general, using a unique_lock
object is always the way to go compared to manual locking and unlocking. If anything, lock and unlock the unique_lock
, not the mutex.
Remarks unrelated to multithreading
stream.buffer.clear()
is more efficient thanstream.buffer = vector<char>()
because this will reuse the buffer memory instead of deallocating it completely.As others have noted, these parts of boost are now part of the standard library. Replace
boost::
withstd::
and enable a recent C standard like C 14 or 17 and you are fine. At worst you have to replaceshared_mutex
withshared_timed_mutex
.In Demuxer, you pass the User and Data objects by value. Are you sure those shouldn't be const references?