Home > Enterprise >  What is the best way to structure an advanced multi-threaded Powershell function to work both on the
What is the best way to structure an advanced multi-threaded Powershell function to work both on the

Time:09-09

I have a function that flattens directories in parallel for multiple folders. It works great when I call it in a non-pipeline fashion:

$Files = Get-Content $FileList
Merge-FlattenDirectory -InputPath $Files

But now I want to update my function to work both on the pipeline as well as when called off the pipeline. Someone on discord recommended the best way to do this is to defer all processing to the end block, and use the begin and process blocks to add pipeline input to a list. Basically this:

function Merge-FlattenDirectory {
    [CmdletBinding()]
    param (
        [Parameter(Mandatory,Position = 0,ValueFromPipeline)]
        [string[]]
        $InputPath
    )

    begin {
        $List = [System.Collections.Generic.List[PSObject]]@()
    }

    process {
        if(($InputPath.GetType().BaseType.Name) -eq "Array"){
            Write-Host "Array detected"
            $List = $InputPath
        } else {
            $List.Add($InputPath)
        }
    }

    end {

        $List | ForEach-Object -Parallel {

            # Code here...
            
        } -ThrottleLimit 16
    }
}

However, this is still not working on the pipeline for me. When I do this:

$Files | Merge-FlattenDirectory

It actually passes individual arrays of length 1 to the function. So testing for ($InputPath.GetType().BaseType.Name) -eq "Array" isn't really the way forward, as only the first pipeline value gets used.

My million dollar question is the following:

What is the most robust way in the process block to differentiate between pipeline input and non-pipeline input? The function should add all pipeline input to a generic list, and in the case of non-pipeline input, should skip this step and process the collection as-is moving directly to the end block.

The only thing I could think of is the following:

if((($InputPath.GetType().BaseType.Name) -eq "Array") -and ($InputPath.Length -gt 1)){
    $List = $InputPath
} else {
    $List.Add($InputPath)
}

But this just doesn't feel right. Any help would be extremely appreciated.

CodePudding user response:

You might just do

function Merge-FlattenDirectory {
    [CmdletBinding()]
    param (
        [Parameter(Mandatory,Position = 0,ValueFromPipeline)]
        [string[]]
        $InputPath
    )
    begin {
        $List = [System.Collections.Generic.List[String]]::new()
    }
    process {
        $InputPath.ForEach{ $List.Add($_) }
    }
    end {
        $List |ForEach-Object -Parallel {
            # Code here...
        } -ThrottleLimit 16
    }
}

Which will process the input values either from the pipeline or the input parameter.

But that doesn't comply with the Strongly Encouraged Development Guidelines to Support Well Defined Pipeline Input (SC02) especially for Implement for the Middle of a Pipeline

This means if you correctly want to implement the PowerShell Pipeline, you should directly (parallel) process your items in the Process block and immediately output any results from there:

function Merge-FlattenDirectory {
    [CmdletBinding()]
    param (
        [Parameter(Mandatory,Position = 0,ValueFromPipeline)]
        [string[]]
        $InputPath
    )
    begin {
        $SharedPool = New-ThreadPool -Limit 16
    }
    process {
        $InputPath |ForEach-Object -Parallel -threadPool $Using:SharedPool {
            # Process your current item ($_) here ...
        }
    }
}

In general, script authors are advised to use idiomatic PowerShell which often comes down to lesser object manipulations and usually results in a correct PowerShell pipeline implementation with less memory usage.

Please let me know if you intent to collect (and e.g. order) the output based on this suggestion.

Caveat

The full invocation of the ForEach-Object -Parallel cmdlet itself is somewhat inefficient as you open and close a new pipeline each iteration. To resolve this, my whole general statement about idiomatic PowerShell falls a bit apart, but should be resolvable by using a steppable pipeline.

To implement this, you might use the ForEach-Object cmdlet as a template:

[System.Management.Automation.ProxyCommand]::Create((Get-Command ForEach-Object))

And set the ThrottleLimit of the ThreadPool in the Begin Block

function Merge-FlattenDirectory {
    [CmdletBinding()]
    param (
        [Parameter(Mandatory,Position = 0,ValueFromPipeline)]
        [string[]]
        $InputPath
    )
    begin {
        $PSBoundParameters  = @{
            ThrottleLimit = 4
            Parallel = {
                Write-Host (Get-Date).ToString('HH:mm:ss.s') 'Started' $_
                Start-Sleep -Seconds 3
                Write-Host (Get-Date).ToString('HH:mm:ss.s') 'finished' $_
            }
        }
        $wrappedCmd = $ExecutionContext.InvokeCommand.GetCommand('ForEach-Object', [System.Management.Automation.CommandTypes]::Cmdlet)
        $scriptCmd = {& $wrappedCmd @PSBoundParameters }
        $steppablePipeline = $scriptCmd.GetSteppablePipeline($myInvocation.CommandOrigin)
        $steppablePipeline.Begin($PSCmdlet)
    }
    process {
        $InputPath.ForEach{ $steppablePipeline.Process($_) }
    }
    end {
        $steppablePipeline.End()
    }
}
1..5 |Merge-FlattenDirectory
17:57:40.40 Started 3
17:57:40.40 Started 2
17:57:40.40 Started 1
17:57:40.40 Started 4
17:57:43.43 finished 3
17:57:43.43 finished 1
17:57:43.43 finished 4
17:57:43.43 finished 2
17:57:43.43 Started 5
17:57:46.46 finished 5

CodePudding user response:

Here's how I would write it with comments where I have changed it.

function Merge-FlattenDirectory {
    [CmdletBinding()]
    param (
        [Parameter(Mandatory,Position = 0,ValueFromPipeline)]
         $InputPath    # <this may be a string, a path object, a file object, 
                       # or an array
    )

    begin {
        $List = @()   # Use an array for less than 100K objects. 
    }

    process {
        #even if InputPath is a string for each will iterate once and set $p
        #if it is an array of strings add each. If it is one or more objects,
        #try to find the right property for the path. 
        foreach  ($p in $inputPath)  {
          if     ($p -is [String]) {$list  = $p } 
          elseif ($p.Path)         {$list  = $p.Path}
          elseif ($p.FullName)     {$list  = $p.FullName}
          elseif ($p.PSPath)       {$list  = $p.PSPath}
          else                     {Write-warning "$P makes no sense"} 
        } 

    }
    
    end {

        $List | ForEach-Object -Parallel {

            # Code here...
            
        } -ThrottleLimit 16
    }
}

@iRon That "write for the middle of the pipeline" in the docs does not mean write everything in the process block .

function one { @(1,2,3,4,5) }

function two {
    param   ([parameter(ValueFromPipeline=$true)] $p  )
    begin   {Write-host "Two begins"      ; $a  = @() }
    process {Write-host "Two received $P" ; $a  = $p  }
    end     {Write-host "Two ending"      ; $a; Write-host "Two ended"}       
}

function three {
    param   ([parameter(ValueFromPipeline=$true)] $p )
    begin   {Write-host "three Starts"    }
    process {Write-host "Three received $P" }
    end     {Write-host "Three ended"    }       
}
 one | two | three

One is treated as an end block. One, two and three all run their begins (one's is empty). One's output goes to the process block in two, which just collects the data. Two's end block starts after one's end-block ends, and sends output At this point three's process block gets input. After two's end block ends, three's endblock runs.

Two is "in the middle" it has a process block to deal with multiple piped items (if it were all one 'end' block it would only process the last one).

  • Related