Home > Mobile >  C# ThreadPool Race-Condition, Closure, locking or something else?
C# ThreadPool Race-Condition, Closure, locking or something else?

Time:10-31

Still pretty new to threads so I'm sure it is one of those little gotchas and a repeat question, but I have been unable to find the answer browsing the threads.

I have a port scanner app in C#. I'm using threadpools to spin up a new TcpClient for each port and probe if it's open. After suffering through the concepts of closures and thread synchronization, I am having an issue where when multiple threads try to save their results to different indexes in the Orchestrator.hosts (List).

I have multiple threads trying to update a single List results object. My understanding is this is fine as long as I lock the object on write, however I'm finding that on some updates, multiple entries are getting the same update. IE, Thread #1 supposed to update Hosts[0].Ports[0].Status to "Open", What happens: Thread #1 updates multiple host with the port result despite passing a specific index for Hosts. Hosts[0].Ports[0].Status to "Open", Hosts[1].Ports[0].Status to "Open", Hosts[2].Ports[0].Status to "Open",

Not sure where my problem is. The Static method I'm calling to perform a probe of a given port

    public static void ScanTCPPorts()
    {
        // Create a list of portsToScan objects to send to thread workers
        //List<ScanPortRequest> portsToScan = new List<ScanPortRequest>();
    
        using (ManualResetEvent resetEvent = new ManualResetEvent(false))
        {
            int toProcess = 0;
            for (var i = 0; i < hostCount; i  ) // Starting at Begining
            {
                int currentHostId = i;
                // To hold our current hosts ID (Assign outside of threaded function to avoid race-condition)
                if (hosts[i].IsAlive || scanDefinition.isForced())
                {
                    int portCount = hosts[i].Ports.Count;
                    for (int p = 0; p < portCount; p  )
                    {
                        // Thread-safe Increment our workQueue counter
                        Interlocked.Increment(ref toProcess);
    
                        int currentPortPosition = p;
    
                        // We need to send the arrayIndex in to the thread function
                        PortScanRequestResponse portRequestResponse = new PortScanRequestResponse(hosts[currentHostId], currentHostId, hosts[currentHostId].Ports[currentPortPosition], currentPortPosition);
    
                        ThreadPool.QueueUserWorkItem(
                            new WaitCallback(threadedRequestResponseInstance => {
                                PortScanRequestResponse portToScan = threadedRequestResponseInstance as PortScanRequestResponse;
                                PortScanRequestResponse threadResult = PortScanner.scanTCPPort(portToScan);
                                // Lock so Thread-safe update to result
                                lock (Orchestrator.hosts[portToScan.hostResultIndex])
                                {
                                    if (threadResult.port.status == PortStatus.Open)
                                    {
                                        // Update result
Orchestrator.hosts[portToScan.hostResultIndex].Ports[portToScan.portResultIndex].status = PortStatus.Open;
                                        //Logger.Log(hosts[currentHostId].IPAddress   " "   hosts[currentHostId].Ports[currentPortPosition].type   " "   hosts[currentHostId].Ports[currentPortPosition].portNumber   " is open");
                                    }
                                    else
                                    {
                                        Orchestrator.hosts[portToScan.hostResultIndex].Ports[portToScan.portResultIndex].status = PortStatus.Closed;
                                    }
                                    // Check if this was the last scan for the given host
                                    if (Orchestrator.hosts[portToScan.hostResultIndex].PortScanComplete != true)
                                    {
                                        if (Orchestrator.hosts[portToScan.hostResultIndex].isCompleted())
                                        {
                                            Orchestrator.hosts[portToScan.hostResultIndex].PortScanComplete = true;
                                            // Logger.Log(hosts[currentHostId].IPAddress   " has completed a port scan");
                                            Orchestrator.hosts[portToScan.hostResultIndex].PrintPortSummery();
                                        }
                                    }
                                }
                                // Safely decrement the counter
                                if (Interlocked.Decrement(ref toProcess) == 0)
                                    resetEvent.Set();
                            }), portRequestResponse);   // Pass in our Port to scan
                    }
                }
            }
            resetEvent.WaitOne();
        }
    }

Here is the worker process in a separate public static class.

    public static PortScanRequestResponse scanTCPPort(object portScanRequest) {
        PortScanRequestResponse portScanResponse = portScanRequest as PortScanRequestResponse;
        HostDefinition host = portScanResponse.host;
        ScanPort port = portScanResponse.port;
        try
        {
            using (TcpClient threadedClient = new TcpClient())
            {
                try
                {
                    IAsyncResult result = threadedClient.BeginConnect(host.IPAddress, port.portNumber, null, null);
                    Boolean success = result.AsyncWaitHandle.WaitOne(Orchestrator.scanDefinition.GetPortTimeout(), false);
    
                    if (threadedClient.Client != null)
                    {
                        if (success)
                        {
                            threadedClient.EndConnect(result);
                            threadedClient.Close();
                            portScanResponse.port.status = PortStatus.Open;
                            return portScanResponse;
                        }
                    }
                } catch { }
            }
        }
        catch
        { }
        portScanResponse.port.status = PortStatus.Closed;
        return portScanResponse;
    }

Originally I was pulling the host index from a free variable, thinking this was the problem moved it to inside the delegate. I tried locking the Hosts object everywhere there was a write. I have tried different thread sync techniques (Countdownevent and Manual reset event).

I think there is just some fundamental threading principal I have not been introduced to yet, or I have made a very simple logic mistake.

CodePudding user response:

I have multiple threads trying to update a single List results object. My understanding is this is fine as long as I lock the object on write.

I haven't studied your code, but the above statement alone is incorrect. When a List<T>, or any other non-thread-safe object , is used in a multithreaded environment, all interactions with the object must be synchronized. Only one thread at a time should be allowed to interact with the object. Both writes and reads must be enclosed in lock statements, using the same locker object. Even reading the Count must be synchronized. Otherwise the usage is erroneous, and the behavior of the program is undefined.

CodePudding user response:

I was hyper-focused on it being a thread issue because this was my firsts threaded project. Turned out to be that I didn't realize copies of a List<> objects are references to their original object (reference type). I assumed my threads were accessing my save structure in an unpredictable way, but my arrays of ports were all referencing the same object.

This was a "reference type" vs "value type" issue on my List<> of ports.

  • Related