Home > Blockchain >  How to parallelize a merge sort over a span<T>
How to parallelize a merge sort over a span<T>

Time:08-17

I've been getting an a better understanding of Span<T> by using it to implement a merge sort algorithm.

My working code is here, and you can run it here (without the unsafe bits.)

using System;
using System.Collections.Generic;
                    
public class Program
{
    public static void Main()
    {
        Console.WriteLine(Sort.Merge("bonakidbonakidbonakid"));
    }
}

public static class Sort
{
    public unsafe static string Merge(string input, IComparer<char> comparer = null)
    {
        comparer ??= Comparer<char>.Default;
        // buffer to contain the orginal data.
        Span<char> source = stackalloc char[input.Length];
        input.AsSpan().CopyTo(source);
        // other buffer, that starts as the output.
        Span<char> buffer = stackalloc char[source.Length];
        
        // copy the stack allocated buffer out as the result;
        return MergeLadder(
            source,
            buffer,
            comparer).ToString();
    }
    
    public unsafe static T[] Merge<T>(T[] input, IComparer<T> comparer = null)
            where T : unmanaged
    {
        comparer ??= Comparer<T>.Default;
        
        // buffer to contain the orginal data.
        Span<T> source = stackalloc T[input.Length];
        // intentionally copying input so it is not mutated.
        input.AsSpan().CopyTo(source);
        // other buffer, that starts as the output.
        Span<T> buffer = stackalloc T[source.Length];
        
        // copy the stack allocated buffer out as the result;
        return MergeLadder(
            source,
            buffer,
            comparer).ToArray();
    }
    
    /// <remarks>
    /// Bottom up merge with alternatring buffers.
    /// </remarks>
    private static ReadOnlySpan<T> MergeLadder<T>(
        Span<T> input,
        Span<T> output,
        IComparer<T> comparer,
        int width = 2)
    {
        var half = width / 2;
        
        // walk the input sequence in mergable strides
        // combine the left and right halves into the output
        for (int i = 0; i < input.Length; i  = width)
        {
            Span<T> left;
            Span<T> right;
            Span<T> merge;
            
            var remaining = input.Length - i;
            if (remaining <= half)
            {
                // not enough left for a right.
                left = input.Slice(i, remaining);
                right = Span<T>.Empty;
                merge = output.Slice(i, remaining);
            }
            else if (remaining < width)
            {
                // not enought for a whole right.
                left = input.Slice(i, half);
                right = input.Slice(i   half, remaining - half);
                merge = output.Slice(i, remaining);
            }
            else
            {
                // the full stride.
                left = input.Slice(i, half);
                right = input.Slice(i   half, half);
                merge = output.Slice(i, width);
            }
            
            // Now merge the left and right for this stride.
            Merge(left, right, merge, comparer);
        }
        
        // Did the last stride cover the whole input?
        if (width >= input.Length)
        {
            // Yes, everything is sorted
            return output;
        }
        else
        {
            // No, walk the array again with double the width.
            // Switch the buffers so we don't walk over the results.
            return MergeLadder(output, input, comparer, width * 2);
        }
    }
    
    private static void Merge<T>(
        ReadOnlySpan<T> left,
        ReadOnlySpan<T> right,
        Span<T> merge,
        IComparer<T> comparer)
    {
        //While either span has an element
        for(int m = 0, l = 0, r= 0; l < left.Length || r < right.Length; m  )
        {
            if (l < left.Length && r < right.Length)
            {
                //both sides have elements
                if (comparer.Compare(left[l], right[r]) <= 0)
                {
                    // left is less than right
                    merge[m] = left[l];
                    l  ;
                }
                else
                {
                    // right is less than left
                    merge[m] = right[r];
                    r  ;
                }
            }
            else if (l < left.Length)
            {
                // only left has some left
                merge[m] = left[l];
                l  ;
            }
            else
            {
                // only right has some left
                merge[m] = right[r];
                r  ;
            }
        }
    }
}

I have an idea that I could change the code that merges sections of the data,

        // walk the input sequence in mergable strides
        // combine the left and right halves into the output
        for (int i = 0; i < input.Length; i  = width)
        {
            Span<T> left;
            Span<T> right;
            Span<T> merge;

            var remaining = input.Length - i;
            if (remaining <= half)
            {
                // not enough left for a right.
                left = input.Slice(i, remaining);
                right = Span<T>.Empty;
                merge = output.Slice(i, remaining);
            }
            else if (remaining < width)
            {
                // not enought for a whole right.
                left = input.Slice(i, half);
                right = input.Slice(i   half, remaining - half);
                merge = output.Slice(i, remaining);
            }
            else
            {
                // the full stride.
                left = input.Slice(i, half);
                right = input.Slice(i   half, half);
                merge = output.Slice(i, width);
            }

            // Now merge the left and right for this stride.
            Merge(left, right, merge, comparer);
        }

to operate in parallel. Potentially, this could offer better performance but, I can't work out a good way to do that with the ref struct limitations applying.

I've an idea that appropriate use of Memory<T> is required but given the use of indexers in the inner Merge function I don't understand how that can be achieved.

If I can get a parallel version working I can benchmark the two. Any Ideas/suggestions/rewrites?

CodePudding user response:

You should probably avoid using stackalloc, since this will fail if size of your input is to large. Especially since you convert it to a regular array in the end anyway, so you should just allocate memory on the heap to start with. Or possibly, use a pool of memory blocks you can reuse. And if your input is small enough to fit inside the stack, you probably do not benefit from any parallelization.

If you think about it, using stack memory for a parallel operation makes little sense, since the worker threads would need to reference stack memory belonging to another thread, and I do not think there is any way to make that both fast and memory-safe.

Once you have actual heap memory your problem mostly disappear. Your MergeLadder can take regular arrays as input parameters, and you can convert your arrays to spans for the Merge method. That should allow your main loop to be converted to a Parallel.For.

CodePudding user response:

Here is an asynchronous version based on the advice of @JonasH.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
                    
public class Program
{
    public static void Main()
    {
        Console.WriteLine(Sort.Merge("bonakidbonakidbonakid"));
    }
}

public static class Sort
{
    public async static ValueTask<string> Merge(
            string input,
            IComparer<char> comparer = null)
    {
        comparer ??= Comparer<char>.Default;
        
        // buffer to contain the orginal data.
        Memory<char> source = new char[input.Length];
        input.AsMemory().CopyTo(source);
        // other buffer, that starts as the output.
        Memory<char> buffer = new char[source.Length];
        
        // copy the stack allocated buffer out as the result;
        return (await MergeLadder(
            source,
            buffer,
            comparer)).ToString();
    }
    
    public async static ValueTask<T[]> Merge<T>(
            T[] input,
            IComparer<T> comparer = null)
    {
        comparer ??= Comparer<T>.Default;
        
        // buffer to contain the orginal data.
        Memory<T> source = new T[input.Length];
        // intentionally copying input so it is not mutated.
        input.AsMemory().CopyTo(source);
        // other buffer, that starts as the output.
        Memory<T> buffer = new T[source.Length];
        
        // copy the stack allocated buffer out as the result;
        return (await MergeLadder(
            source,
            buffer,
            comparer)).ToArray();
    }
    
    /// <remarks>
    /// Bottom up merge with alternatring buffers.
    /// </remarks>
    private async static ValueTask<ReadOnlyMemory<T>> MergeLadder<T>(
        Memory<T> input,
        Memory<T> output,
        IComparer<T> comparer,
        int width = 2)
    {
        var half = width / 2;
        
        await WhenAll(GetMergeTasks(
            input,
            output,
            comparer,
            width,
            half));
        
        // Did the last stride cover the whole input?
        if (width >= input.Length)
        {
            // Yes, everything is sorted
            return output;
        }
        else
        {
            // No, walk the array again with double the width.
            // Switch the buffers so we don't walk over the results.
            return await MergeLadder(output, input, comparer, width * 2);
        }
    }
    
    private async static IAsyncEnumerable<Task> GetMergeTasks<T>(
        Memory<T> input,
        Memory<T> output,
        IComparer<T> comparer,
        int width,
        int half)
    {
        // walk the input sequence in mergable strides
        // combine the left and right halves into the output
        for (int i = 0; i < input.Length; i  = width)
        {
            Memory<T> left;
            Memory<T> right;
            Memory<T> merge;
            
            var remaining = input.Length - i;
            if (remaining <= half)
            {
                // not enough left for a right.
                left = input.Slice(i, remaining);
                right = Memory<T>.Empty;
                merge = output.Slice(i, remaining);
            }
            else if (remaining < width)
            {
                // not enought for a whole right.
                left = input.Slice(i, half);
                right = input.Slice(i   half, remaining - half);
                merge = output.Slice(i, remaining);
            }
            else
            {
                // the full stride.
                left = input.Slice(i, half);
                right = input.Slice(i   half, half);
                merge = output.Slice(i, width);
            }
            
            // Now merge the left and right for this stride.
            yield return Task.Run(() => Merge(left, right, merge, comparer));
            await Task.CompletedTask;
        }
    }
    
    public static async ValueTask WhenAll(IAsyncEnumerable<Task> tasks)
    {
        ArgumentNullException.ThrowIfNull(tasks);

        // We don't allocate the list if no task throws
        List<Exception> exceptions = null;

        await foreach(var task in tasks)
        {
            try
            {
                await task;
            }
            catch(Exception ex)
            {
                exceptions ??= new();
                exceptions.Add(ex);
            }
        }

        
        if (!(exceptions is null))
        {
            throw new AggregateException(exceptions);
        }
   }
            
    private static void Merge<T>(
        ReadOnlyMemory<T> leftMemory,
        ReadOnlyMemory<T> rightMemory,
        Memory<T> mergeMemory,
        IComparer<T> comparer)
    {
        var left = leftMemory.Span;
        var right = rightMemory.Span;
        var merge = mergeMemory.Span;
        
        //While either span has an element
        for(int m = 0, l = 0, r= 0; l < left.Length || r < right.Length; m  )
        {
            if (l < left.Length && r < right.Length)
            {
                //both sides have elements
                if (comparer.Compare(left[l], right[r]) <= 0)
                {
                    // left is less than right
                    merge[m] = left[l];
                    l  ;
                }
                else
                {
                    // right is less than left
                    merge[m] = right[r];
                    r  ;
                }
            }
            else if (l < left.Length)
            {
                // only left has some left
                merge[m] = left[l];
                l  ;
            }
            else
            {
                // only right has some left
                merge[m] = right[r];
                r  ;
            }
        }
    }
}

Now I wonder, how big would the input need to be for the parallel version to win?

  • Related