I am required to process many shopping carts in front of many queues using multithreading. I created a container for database from a given file holds strings barocdes, item name, and price. Also created a container for customer carts from given file that holds strigns cart number and barcode of items. The requirement is to lock the process when scanning a single item only NOT the whole items of a cart to let other queues process their cart then printing the receipts. I put mutex lock before and after scanning statement but failed to store each cart items separated on other carts. I set everything well but the process function I have an issue with, becasue the receipts I got has a mess or truncated. I am new with multithreading concepts and spent long time with is code but I stuck and I need an advise of what is wrong on my function. Thanks in advance.
#include <fstream>
#include <cstdlib>
#include <string>
#include <iostream>
#include <vector>
#include <regex>
#include <algorithm>
#include <tuple>
#include <bitset>
#include <math.h>
#include <sstream>
#include <unordered_map>
#include <condition_variable>
#include <thread>
#include <mutex>
#include <queue>
#include <iomanip>
using namespace std;
std::mutex dataBaseMut;
//****************carfilereader****************
class cartFileReader
{
private:
string datafile;
vector<pair<string, vector<string>>> carts;
public:
cartFileReader()
{
datafile = "None";
}
void setDatafile(string fName)
{
datafile = fName;
}
//reading customer carts file that hold cart number and barcode items // works fine
void readCartsFile()
{
ifstream infile(datafile);
string name = "";
string buffer = "";
while (getline(infile, name))
{
getline(infile, buffer);
vector<string> items = Parse(buffer, "[^,]*");
carts.push_back(make_pair(name, items));
}
infile.close();
}
//******************************************
vector<string> Parse(string text, string split)
{
vector<string> words;
sregex_token_iterator end;
regex pattern(split);
string result = "";
for (sregex_token_iterator iter(text.begin(), text.end(), pattern); iter != //needs modification the loop iterate extra when numbers end
end; iter)
{
if ((*iter).length() > 0)
{
result = "";
if ((static_cast<string>(*iter))[0] != 0x20)
{
string hexdec = static_cast<string>(*iter);
//Converting the Hexadecimal item code to binary for seraching
for (size_t k = 0; k < hexdec.size(); k = 3)
{
unsigned long long hv;
char bit[4] = { hexdec[k 0], hexdec[k 1], hexdec[k 2], '\0' };
istringstream ost(bit);
ost >> hex >> hv;
result = bitset<9>(hv).to_string();
}
words.push_back(result);
}
}
}
return words;
}
vector<pair<string, vector<string>>> getCarts()
{
return carts;
}
};
//*******************************************************************
//****************reading barcode file to create a database //map***********
class barcodeFilereader
{
private:
vector<string> errorLogProducts;
string dataBaseFile;
public:
barcodeFilereader()
{
dataBaseFile = "None";
}
void setDatafile(string fName)
{
dataBaseFile = fName;
}
vector<string> getErrorLog()
{
return errorLogProducts;
}
unordered_map<string, pair<string, string>> readBarcodeFile()
{
bool duplicatedItem;
unordered_map<string, pair<string, string>> dataBase;
ifstream infile(dataBaseFile);
string line = "";
ofstream errorFile("errorLog.txt");
regex exp_name("<Name>(. ?)<\\/Name>");
regex exp_barcode("<Barcode>(. ?)<\\/Barcode>");
regex exp_price("<Price>(. ?)<\\/Price>");
if (infile.is_open())
{
int lineNo = 0;
int d = 0;
while (!infile.eof())
{
duplicatedItem = false;
string itemName, barcode, price;
smatch cartNo_match, barcode_match, price_match;
getline(infile, line); // <Product>
if (line.empty()) { continue; }
getline(infile, line); // <Name>
regex_search(line, cartNo_match, exp_name);
itemName = cartNo_match[1];
getline(infile, line); // <Barcode>
regex_search(line, barcode_match, exp_barcode);
barcode = barcode_match[1];
getline(infile, line); // <Price>
regex_search(line, price_match, exp_price);
price = price_match[1];
getline(infile, line); // </Product>
std::vector<string>::iterator it;
if (dataBase.find(barcode) != dataBase.end())
{
bool duplicatedErrorBarcode = false;
it = std::find(errorLogProducts.begin(),
errorLogProducts.end(), barcode);
{
if (it != errorLogProducts.end())
dataBase.erase(barcode);
duplicatedItem = true;
}
if (duplicatedErrorBarcode)
continue;
errorLogProducts.push_back(barcode);
dataBase.erase(barcode);
duplicatedItem = true;
}
if (duplicatedItem)
continue;
dataBase[barcode] = make_pair(itemName, price);
}
}
return dataBase;
}
};
//********************************************
class cartManager
{
vector<string> errorLogProducts;
public:
cartManager() {};
void setErrorLog(vector<string> errorLog)
{
errorLogProducts = errorLog;
}
//Building 5-15 queues and each queue has customer carts
//and each cart has cartNo string and vector of barcode items
//strings
void buildQue(vector< queue<pair<string, vector<string>>>>&
all_queues, vector<pair<string,
vector<string>>>& carts, int num_queues)
{
queue<pair<string, vector<string>>> singleQueCarts;
for (int i = 0; i < num_queues; i )
{
static int j = 0;
all_queues.push_back(singleQueCarts);
int count = 0;
int queLength = carts.size() / num_queues;
while (count < queLength)
{
all_queues[i].push(carts[j ]);
//cout << all_queues[i].size();
count ;
}
if (i == num_queues - 1 && (carts.size() % num_queues != 0))
{
for (int i = 0; i < carts.size() % num_queues; i )
{
int count = 0;
all_queues[i].push(carts[j ]);
}
}
}
}
//Processing 5-15 queues in fornt customer carts at time
//this function parametes are database and SINGLE queue
void processCarts(const unordered_map<string, pair<string, string>>&
dataBase , queue<pair<string, vector<string>>>& singleQue)
{
// deque to hold cartNo, itemName, and price
deque <tuple<string, pair<string, string>>> cartNo_Item_Price;
string totalPrice = "";
string cartNo;
string cartItem;
string itemPrice;
string barcode;
bool errorFlag = false;
// file to write the receipts of all carts
dataBaseMut.lock();
ofstream outFile("receipt.txt");
stringstream receipts;
dataBaseMut.unlock();
//if signle queue is not empty, then process the carts
while (!singleQue.empty())
{
int frontCartSize;
//detemine the queue size
int singleQueSzie = singleQue.size();
//get cartNo of cart that is in front of deque
cartNo = singleQue.front().first;
int j = 0;
//get cartSize of cart that is in front of deque
frontCartSize = singleQue.front().second.size();
// process in front cart items till end of items
for (; j < frontCartSize; j )
{
//lock data base
dataBaseMut.lock();
// search barode of an item in data base
auto it = dataBase.find(singleQue.front().second[j]);
// if not found then search it in error barcodes log
//dont care about this for now it doesnt affect the function
if (it == dataBase.end())
{
for (int i = 0; i < errorLogProducts.size(); i )
{
if (errorLogProducts[i] == singleQue.front().second[j])
{
string cartItem = singleQue.front().second[j];
cartNo_Item_Price.push_back(
make_tuple(cartNo, make_pair("**Error
Item**", "**Error reading**")));
errorFlag = true;
}
}
}
// if found, then get information of the item from data base
else
{
cartNo = singleQue.front().first; // get cart number
cartItem = it->second.first; // get item name
itemPrice = it->second.second; //get item price
//push all above into a vector
cartNo_Item_Price.push_back(make_tuple(cartNo,
make_pair(cartItem, itemPrice)));
}
//unlock data base
dataBaseMut.unlock();
//if you reach the end of items of a cart, then remove it
//from deque
if (j == singleQue.front().second.size() - 1)
{
singleQue.pop(); // remove processed cart
//start with printing the receipt of procces cart
outFile << "**Receipt*" << endl;
float cart_total = 0.0;
if (!cartNo_Item_Price.empty())
outFile << get<0>(cartNo_Item_Price[0]) << endl;
std::deque<tuple<string, pair<string,
string>>>::iterator ptr;
for (ptr = cartNo_Item_Price.begin(); ptr !=
cartNo_Item_Price.end(); ptr )
{
outFile << " " << get<1>
(*ptr).first <<
setw(5) << right << get<1>(*ptr).second <<
"\n";
if (isdigit(get<1>(*ptr).second[0]))
cart_total = stof(get<1>(*ptr).second);
}
if (errorFlag)
{
outFile << " " << "Total: "
<< right <<
cart_total << " $ (Error Items not
included)\n";
outFile << "\n****End of Receipt*\n\n" << endl;
cartNo_Item_Price.clear();
}
else
{
outFile << " " << "Total: " <<
cart_total;
outFile << "\n**End of Receipt**\n\n" << endl;
cartNo_Item_Price.clear();
}
errorFlag = false;
}
}
}
}
};
int main()
{
srand(time(0));
int min_queues = 5, max_queues = 15;
//Creating random number of queues with range 5-15
int num_queues = min_queues rand() % ((max_queues 1) -
min_queues);
//Class object to manage the queues
cartManager manager;
//Creating data base from given file
unordered_map<string, pair<string, string>> dataBase;
//Class object to read a file that contains data base information
barcodeFilereader barcodeFile;
//set barcode file name
barcodeFile.setDatafile("ProductPrice.xml");
//creating data base container
dataBase = barcodeFile.readBarcodeFile();
//specify the errors in data base given file
manager.setErrorLog(barcodeFile.getErrorLog());
//container that will hold 1000 carts data
vector<pair<string, vector<string>>> carts;
//Class object to read carts information from a given file
cartFileReader cartsObj;
//set carts information file name
cartsObj.setDatafile("Carts.csv");
//reading "Carts.csv" file and parse it
cartsObj.readCartsFile();
//assign cartsobj into carts container
carts = cartsObj.getCarts();
//creating vector to hold all queues
vector<queue<pair<string, vector<string>>>> all_queues;
//vector of threads
std::vector<thread> threads;
//Build the queues of carts
manager.buildQue(all_queues, carts, num_queues);
//Creating a vecotr of threads to process 15 queues
//each single thread responsible of single queue
for (int q = 0; q < num_queues; q )
{
threads.emplace_back
(
&cartManager::processCarts, cartManager(),
ref(dataBase), ref(all_queues[q])
);
}
for (auto& t : threads)
t.join();
threads.clear();
return 0;
}
CodePudding user response:
It's hard to say without the whole source code, but what is wrong in your code is that opening of your file outFile
is not protected by your mutex. This should be a reason why you have a mess in your file.
CodePudding user response:
In your main you create a new cartManager for each thread. I believe it is not what you did on purpose.
threads.emplace_back ( &cartManager::processCarts, cartManager(), ref(dataBase), ref(all_queues[q]) );
Instead of passing cartManager()
pass &manager
. Why it is important? Because of creating a brand new cartManager
for each thread, your function has a different this
context. That means, all your threads has its own dataBaseMut - so nothing is synchronized between your threads because of that!
You don't need to modify
dataBase
inprocessCarts
- in that case, pass it byconst
reference: in that case, it's obvious to other people (and to you) that it is not something that may be modified inprocessCarts
.Now opening the file is synchronized (in case you fix the very first issue I described in this answer), but only opening! Writing to the file is not synchronized, so there is a chance that your threads write to the same file simultaneously. What may happen in that case? A mess! Thus, not only opening the file should be synchronized, but writing as well. I would suggest creating a class that is in charge of writing to a file and is thread safe (it's safe to call its methods from different threads). And that class would take care of synchronizing the access to a file.