Home > other >  C multithreading to process multiple customer carts simultaneously
C multithreading to process multiple customer carts simultaneously

Time:12-07

I am required to process many shopping carts in front of many queues using multithreading. I created a container for database from a given file holds strings barocdes, item name, and price. Also created a container for customer carts from given file that holds strigns cart number and barcode of items. The requirement is to lock the process when scanning a single item only NOT the whole items of a cart to let other queues process their cart then printing the receipts. I put mutex lock before and after scanning statement but failed to store each cart items separated on other carts. I set everything well but the process function I have an issue with, becasue the receipts I got has a mess or truncated. I am new with multithreading concepts and spent long time with is code but I stuck and I need an advise of what is wrong on my function. Thanks in advance.

     #include <fstream>
     #include <cstdlib>
     #include <string>
     #include <iostream>
     #include <vector>
     #include <regex>
     #include <algorithm>
     #include <tuple>
     #include <bitset>
     #include <math.h>
     #include <sstream>
     #include <unordered_map>
     #include <condition_variable>
     #include <thread>
     #include <mutex>
     #include <queue>
     #include <iomanip>

      using namespace std;

     std::mutex dataBaseMut;
    //****************carfilereader****************
    class cartFileReader
     {
       private:
       string datafile;
       vector<pair<string, vector<string>>> carts;
       public:
       cartFileReader()
       {
            datafile = "None";
       }

       void setDatafile(string fName)
       {
            datafile = fName;
      }

     //reading customer carts file that hold cart number and barcode items // works fine
        void readCartsFile()
        {
         ifstream infile(datafile);
         string name = "";
         string buffer = "";

         while (getline(infile, name))
         {
            getline(infile, buffer);
            vector<string> items = Parse(buffer, "[^,]*");
            carts.push_back(make_pair(name, items));
         }
         infile.close();
      }

    //******************************************
    vector<string> Parse(string text, string split)
    {
        vector<string> words;
        sregex_token_iterator end;
        regex pattern(split);
        string result = "";
        for (sregex_token_iterator iter(text.begin(), text.end(), pattern); iter != //needs modification the loop iterate extra when numbers end
            end;   iter)
        {
            if ((*iter).length() > 0)
            {
                result = "";
                if ((static_cast<string>(*iter))[0] != 0x20)
                {
                    string hexdec = static_cast<string>(*iter);
                    //Converting the Hexadecimal item code to binary for seraching
                    for (size_t k = 0; k < hexdec.size(); k  = 3)
                    {
                        unsigned long long hv;
                        char bit[4] = { hexdec[k   0], hexdec[k   1], hexdec[k   2], '\0' };
                        istringstream ost(bit);
                        ost >> hex >> hv;
                        result  = bitset<9>(hv).to_string();
                    }
                    words.push_back(result);
                }
            }
        }
        return words;
    }

    vector<pair<string, vector<string>>> getCarts()
    {
        return carts;
    }
};


//*******************************************************************
//****************reading barcode file to create a database //map***********  
   class barcodeFilereader
   {
    private:
    vector<string> errorLogProducts;
    string dataBaseFile;
     public:
    barcodeFilereader()
    {
        dataBaseFile = "None";
    }

    void setDatafile(string fName)
    {
        dataBaseFile = fName;
    }

    vector<string> getErrorLog()
    {
        return errorLogProducts;
    }

      unordered_map<string, pair<string, string>> readBarcodeFile()
      {
        bool duplicatedItem;
        unordered_map<string, pair<string, string>> dataBase;
        ifstream infile(dataBaseFile);
        string line = "";
        ofstream errorFile("errorLog.txt");

        regex exp_name("<Name>(. ?)<\\/Name>");
        regex exp_barcode("<Barcode>(. ?)<\\/Barcode>");
        regex exp_price("<Price>(. ?)<\\/Price>");
        if (infile.is_open())
        {
            int lineNo = 0;
            int d = 0;
            while (!infile.eof())
            {
                duplicatedItem = false;
                string itemName, barcode, price;
                smatch cartNo_match, barcode_match, price_match;
                getline(infile, line); // <Product>
                if (line.empty()) { continue; }
                getline(infile, line); // <Name>
                regex_search(line, cartNo_match, exp_name);
                itemName = cartNo_match[1];
                getline(infile, line); // <Barcode>
                regex_search(line, barcode_match, exp_barcode);
                barcode = barcode_match[1];
                getline(infile, line); // <Price>
                regex_search(line, price_match, exp_price);
                price = price_match[1];
                getline(infile, line); // </Product>

                std::vector<string>::iterator it;
                if (dataBase.find(barcode) != dataBase.end())
                {
                    bool duplicatedErrorBarcode = false;
                    it = std::find(errorLogProducts.begin(), 
                       errorLogProducts.end(), barcode);
                    {
                        if (it != errorLogProducts.end())
                            dataBase.erase(barcode);
                        duplicatedItem = true;
                    }
                    if (duplicatedErrorBarcode)
                        continue;
                    errorLogProducts.push_back(barcode);
                    dataBase.erase(barcode);
                    duplicatedItem = true;
                }
                if (duplicatedItem)
                    continue;
                dataBase[barcode] = make_pair(itemName, price);
            }
        }
        return dataBase;
    }

  };

    //********************************************
    class cartManager
    {
      vector<string> errorLogProducts;
       public:
      cartManager() {};
      void setErrorLog(vector<string> errorLog)
      {
        errorLogProducts = errorLog;
      }

        //Building 5-15 queues and each queue has customer carts 
        //and each cart has cartNo string and vector of barcode items 
        //strings 
     void buildQue(vector< queue<pair<string, vector<string>>>>& 
        all_queues, vector<pair<string,
        vector<string>>>& carts, int  num_queues)
      {
        queue<pair<string, vector<string>>> singleQueCarts;
        for (int i = 0; i < num_queues; i  )
        {
            static int j = 0;
            all_queues.push_back(singleQueCarts);
            int count = 0;
            int queLength = carts.size() / num_queues;
            while (count < queLength)
            {
                all_queues[i].push(carts[j  ]);
                //cout << all_queues[i].size();
                count  ;
            }
            if (i == num_queues - 1 && (carts.size() % num_queues != 0))
            {
                for (int i = 0; i < carts.size() % num_queues; i  )
                {
                    int count = 0;
                    all_queues[i].push(carts[j  ]);
                }
            }
        }
      }

     //Processing 5-15 queues in fornt customer carts at time 
     //this function parametes are database and SINGLE queue
     void processCarts(const unordered_map<string, pair<string, string>>&  
       dataBase , queue<pair<string, vector<string>>>& singleQue)
     {
         // deque to hold cartNo, itemName, and price
        deque <tuple<string, pair<string, string>>> cartNo_Item_Price; 
        string totalPrice = "";
        string cartNo;
        string cartItem;
        string itemPrice;
        string barcode;
        bool errorFlag = false;

        // file to write the receipts of all carts 
         dataBaseMut.lock();
        ofstream outFile("receipt.txt"); 
        stringstream receipts;
         dataBaseMut.unlock();

        //if signle queue is not empty, then process the carts
        while (!singleQue.empty())
        {
            int frontCartSize;
           //detemine the queue size
            int singleQueSzie = singleQue.size(); 
            //get cartNo of cart that is in front of deque
            cartNo = singleQue.front().first; 
            int j = 0;
            
            //get cartSize of cart that is in front of deque
            frontCartSize = singleQue.front().second.size(); 
            // process in front cart items till end of items
            for (; j < frontCartSize; j  ) 
            {
                  //lock data base
                 dataBaseMut.lock();

                // search barode of an item in data base
                auto it = dataBase.find(singleQue.front().second[j]);

               // if not found then search it in error barcodes log 
             //dont care about this for now it doesnt affect the function

                if (it == dataBase.end()) 
                    
                {
                for (int i = 0; i < errorLogProducts.size(); i  )
                {
                  if (errorLogProducts[i] == singleQue.front().second[j])
                        {
                         string cartItem = singleQue.front().second[j];
                         cartNo_Item_Price.push_back(        
                            make_tuple(cartNo, make_pair("**Error 
                            Item**", "**Error reading**")));
                            errorFlag = true;
                        }
                    }
                }
             // if found, then get information of the item from data base 
                else 
                {
                    cartNo = singleQue.front().first; // get cart number
                    cartItem = it->second.first;   // get item name
                    itemPrice = it->second.second; //get item price

                    //push all above into a vector 
                    cartNo_Item_Price.push_back(make_tuple(cartNo, 
                     make_pair(cartItem, itemPrice)));
                }
                //unlock data base
                dataBaseMut.unlock();       
                //if you reach the end of items of a cart, then remove it 
                //from deque
                        
                if (j == singleQue.front().second.size() - 1)
                {
                    
                    singleQue.pop(); // remove processed cart
                
                    //start with printing the receipt of procces cart
                    outFile << "**Receipt*" << endl;

                    float cart_total = 0.0;

                    if (!cartNo_Item_Price.empty())
                        outFile << get<0>(cartNo_Item_Price[0]) << endl;

                    std::deque<tuple<string, pair<string, 
                     string>>>::iterator ptr;
                    for (ptr = cartNo_Item_Price.begin(); ptr != 
                       cartNo_Item_Price.end(); ptr  )
                    {
                        outFile << " "  << get<1> 
                             (*ptr).first <<
                            setw(5) << right << get<1>(*ptr).second << 
                            "\n";
                        if (isdigit(get<1>(*ptr).second[0]))
                            cart_total  = stof(get<1>(*ptr).second);
                    }
                    if (errorFlag)
                    {
                        outFile << " " <<  "Total: " 
                           <<  right <<
                            cart_total << " $ (Error Items not 
                            included)\n";
                        outFile << "\n****End of Receipt*\n\n" << endl;
                        cartNo_Item_Price.clear();
                    }
                    else
                    {
                        outFile << " " << "Total: "  <<
                            cart_total;
                        outFile << "\n**End of Receipt**\n\n" << endl;
                        cartNo_Item_Price.clear();
                    }
                    errorFlag = false;
                    
                }
            
            }
         }
      }
   };


    int main()
    {
       srand(time(0));
       int min_queues = 5, max_queues = 15;

       //Creating random number of queues with range 5-15
       int num_queues = min_queues   rand() % ((max_queues   1) - 
              min_queues);

    //Class object to manage the queues
    cartManager manager;

    //Creating data base from given file
    unordered_map<string, pair<string, string>> dataBase;

    //Class object to read a file that contains data base information
    barcodeFilereader barcodeFile;

    //set barcode file name
    barcodeFile.setDatafile("ProductPrice.xml");

    //creating data base container
    dataBase = barcodeFile.readBarcodeFile();

    //specify the errors in data base given file
    manager.setErrorLog(barcodeFile.getErrorLog());

    //container that will hold 1000 carts data
    vector<pair<string, vector<string>>> carts;

    //Class object to read carts information from a given file
    cartFileReader cartsObj;

    //set carts information file name
    cartsObj.setDatafile("Carts.csv");

    //reading "Carts.csv" file and parse it
    cartsObj.readCartsFile();

    //assign cartsobj into carts container
    carts = cartsObj.getCarts();

    //creating vector to hold all queues
    vector<queue<pair<string, vector<string>>>> all_queues;

    //vector of threads
    std::vector<thread> threads;

    //Build the queues of carts
    manager.buildQue(all_queues, carts, num_queues);

        
    //Creating a vecotr of threads to process 15 queues
    //each single thread responsible of single queue                       
      for (int q = 0; q < num_queues; q  )
     {
        threads.emplace_back
        (
            &cartManager::processCarts, cartManager(),        
            ref(dataBase), ref(all_queues[q])
        );
     }
      for (auto& t : threads)
          t.join();
       threads.clear();
       return 0;
   }

Not correct outpt

Correct output

CodePudding user response:

It's hard to say without the whole source code, but what is wrong in your code is that opening of your file outFile is not protected by your mutex. This should be a reason why you have a mess in your file.

CodePudding user response:

  1. In your main you create a new cartManager for each thread. I believe it is not what you did on purpose.

    threads.emplace_back ( &cartManager::processCarts, cartManager(), ref(dataBase), ref(all_queues[q]) );

Instead of passing cartManager() pass &manager. Why it is important? Because of creating a brand new cartManager for each thread, your function has a different this context. That means, all your threads has its own dataBaseMut - so nothing is synchronized between your threads because of that!

  1. You don't need to modify dataBase in processCarts - in that case, pass it by const reference: in that case, it's obvious to other people (and to you) that it is not something that may be modified in processCarts.

  2. Now opening the file is synchronized (in case you fix the very first issue I described in this answer), but only opening! Writing to the file is not synchronized, so there is a chance that your threads write to the same file simultaneously. What may happen in that case? A mess! Thus, not only opening the file should be synchronized, but writing as well. I would suggest creating a class that is in charge of writing to a file and is thread safe (it's safe to call its methods from different threads). And that class would take care of synchronizing the access to a file.

  • Related