Home > Software engineering >  How to dynamically add items to a PowerShell ArrayList and process them recursively using Runspace p
How to dynamically add items to a PowerShell ArrayList and process them recursively using Runspace p

Time:06-24

I have a for loop that iterates through an ArrayList and during the process, adds more items to the list and processes them as well (iteratively). I am trying to convert this function to run concurrently using Runspacepool.

Here is the normal code without runspace:

$array = [System.Collections.ArrayList]@(1, 2, 3, 4, 5)
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i  ) {
    Write-Host "Counter: $i`tArray: $array"
    if ($array[$i] -in @(1, 2, 3, 4, 5)) {
        $array.Add($array[$i]   3) | Out-Null
    }
}
Write-Host "Array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"

Output is:

Number of items in array before loop: 5
Counter: 0      Array: 1 2 3 4 5
Counter: 1      Array: 1 2 3 4 5 4
Counter: 2      Array: 1 2 3 4 5 4 5
Counter: 3      Array: 1 2 3 4 5 4 5 6
Counter: 4      Array: 1 2 3 4 5 4 5 6 7
Counter: 5      Array: 1 2 3 4 5 4 5 6 7 8
Counter: 6      Array: 1 2 3 4 5 4 5 6 7 8 7
Counter: 7      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 8      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 9      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 10     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 11     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Here is the Runspace function that I am trying to implement:

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()
$runspaces = @()

$scriptblock = {
    Param ($i, $array)
    # Start-Sleep 1 # <------ Output varies significantly if this is enabled
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i   3) | Out-Null
    }
}

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i  ) {
    $runspace = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$i]).AddArgument($array)
    $runspace.RunspacePool = $pool
    $runspaces  = [PSCustomObject]@{ Pipe = $runspace; Status = $runspace.BeginInvoke() }
}

while ($runspaces.Status -ne $null) {
    $completed = $runspaces | Where-Object { $_.Status.IsCompleted -eq $true }
    foreach ($runspace in $completed) {
        $runspace.Pipe.EndInvoke($runspace.Status)
        $runspace.Status = $null
    }
}
Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

Output without sleep function is as expected:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Current value: 4        Array: 1 2 3 4 5 4 5 6 7 8
Current value: 5        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 6        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Output with Sleep:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Array: 1 2 3 4 5 4 5 6 7 8
Number of items in array after loop: 10

I understand that this is happening because the for loop exits before the sleep time is completed and therefore, only the first 5 items are added to the runspace pool.

Is there a way to add more items to the ArrayList dynamically and still process them concurrently using runspaces?

CodePudding user response:

The core of your "working" behaviour is that PowerShell was running your "non-sleep" scriptblocks faster than it could create them in the for loop, so the loop was seeing the new items being added by previous iterations before it reached the end of the array. As a result it had to process all of the items before it exited and moved on to the while loop.

When you added a Start-Sleep it shifted the balance, and it took longer to run the scriptblocks than it did to create them, so the for loop reached the end of the array before the new items were added by the earliest iterations.

The following script fixes this by combining your for and while loops to repeatedly alternate between (i) creating new threads and (ii) checking if they've finished, and only exiting when all the work is done.

However multi-threading is hard so it's best to assume I've made mistakes somewhere, and test properly before you release it to your live workflow...

$scriptblock = {
    Param ($i, $array)
    # random sleep to simulate variable-length workloads. this is
    # more likely to flush out error conditions than a fixed sleep 
    # period as threads will finish out-of-turn more often
    Start-Sleep (Get-Random -Minimum 1 -Maximum 10)
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i   3) | Out-Null
    }
}

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()

# note - your "$runspaces" variable is misleading as you're creating 
# "PowerShell" objects, and a "Runspace" is a different thing entirely,
# so I've called it $instances instead
# see https://docs.microsoft.com/en-us/dotnet/api/system.management.automation.powershell?view=powershellsdk-7.0.0
#  vs https://docs.microsoft.com/en-us/dotnet/api/system.management.automation.runspaces.runspace?view=powershellsdk-7.0.0
$instances = @()

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"

while( $true )
{

    # start PowerShell instances for any items in $array that don't already have one.
    # on the first iteration this will seed the initial instances, and in
    # subsequent iterations it will create new instances for items added to
    # $array since the last iteration.
    while( $instances.Length -lt $array.Count )
    {
        $instance = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$instances.Length]).AddArgument($array);
        $instance.RunspacePool = $pool
        $instances  = [PSCustomObject]@{ Value = $instance; Status = $instance.BeginInvoke() }
    }

    # watch out because there's a race condition here. it'll need very unlucky 
    # timing, *but* an instance might have added an item to $array just after
    # the while loop finished, but before the next line runs, so there *could* 
    # be an item in $array that hasn't had an instance created for it even
    # if all the current instances have completed

    # is there any more work to do? (try to mitigate the race condition
    # by checking again for any items in $array that don't have an instance
    # created for them)
    $active = @( $instances | Where-Object { -not $_.Status.IsCompleted } )
    if( ($active.Length -eq 0) -and ($instances.Length -eq $array.Count) )
    {
        # instances have been created for every item in $array,
        # *and* they've run to completion, so there's no more work to do
        break;
    }

    # if there are incomplete instances, wait for a short time to let them run
    # (this is to avoid a "busy wait" - https://en.wikipedia.org/wiki/Busy_waiting)
    Start-Sleep -Milliseconds 250;

}

# all the instances have completed, so end them
foreach ($instance in $instances)
{
    $instance.Value.EndInvoke($instance.Status);
}

Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

Example output:

Number of items in array before loop: 5
1 value: 1 2 3 4 5 6 5 7
2 value: 1 2 3 4 5 6
3 value: 1 2 3 4 5
4 value: 1 2 3 4 5 6 5
5 value: 1 2 3 4 5 6 5 7 4
6 value: 1 2 3 4 5 6 5 7
5 value: 1 2 3 4 5 6 5 7 4 8
7 value: 1 2 3 4 5 6 5 7
4 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
8 value: 1 2 3 4 5 6 5 7 4 8 8
7 value: 1 2 3 4 5 6 5 7 4 8 8 7

Note the order of items in the array will vary depending on the length of the random sleeps in the $scriptblock.

There are probably additional improvements that could be made, but this at least seems to work...

CodePudding user response:

This is a different approach from mclayton's helpful answer, hopefully both answers can lead you to solve your problem. This example uses a ConcurrentQueue<T> and consists in multiple threads performing the same action. As you may see, in this case we start only 5 threads that will be trying to dequeue the items and if the item is greater than or equal to 5 it will enqueue it again (tries to simulate, badly, what you have posted in comments, "The actual problem involves Invoke-RestMethod (irm) towards multiple endpoints, based on the results of which, I may have to query more similar endpoints").

Do note, for this example I'm using $threads = $queue.Count, however this should not be always the case. Don't start too many threads or you might kill your session! Also be aware you might overload your network if querying multiple endpoints at the same time. I would say, keep the threads always below $queue.Count.

The results you can expect from below code should vary each runtime.

using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Concurrent

try {
    $queue = [ConcurrentQueue[int]]::new()
    foreach($i in 1, 2, 3, 4, 5) {
        $queue.Enqueue($i)
    }
    $threads = $queue.Count

    $scriptblock = {
        [ref] $target = $null
        while($queue.TryDequeue($target)) {
            [pscustomobject]@{
                'Target Value'      = $target.Value
                'Elements in Queue' = $queue.Count
            }

            # get a random number between 0 and 10
            $ran = Get-Random -Maximum 11
            # if the number is above five, enqueue it
            if ($ran -gt 5) {
                $queue.Enqueue($ran)
            }
        }
    }

    $iss    = [initialsessionstate]::CreateDefault2()
    $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
    $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry]::new(
        'queue', $queue, ''
    ))
    $rspool.Open()

    $rs = for($i = 0; $i -lt $queue.Count; $i  ) {
        $ps = [powershell]::Create().AddScript($scriptblock)
        $ps.RunspacePool = $rspool

        @{
            Instance    = $ps
            AsyncResult = $ps.BeginInvoke()
        }
    }

    foreach($r in $rs) {
        try {
            $r['Instance'].EndInvoke($r['AsyncResult'])
            $r['Instance'].Dispose()
        }
        catch {
            Write-Error $_
        }
    }
}
finally {
    $rspool.ForEach('Dispose')
}
  • Related