Home > Back-end >  How to implement Go concurrent map or slice for managing in-use resources faster?
How to implement Go concurrent map or slice for managing in-use resources faster?

Time:10-21

Image you have a struct that represents a resource that only one user can access at a time. Might look something like this:

type Resource struct{
    InUse bool//or int32/int64 is you want to use atomics
    Resource string //parameters that map to the resource, think `/dev/chardeviceXXX`
}

There are a finite number of these resources and users will request access to them randomly and concurrently so you package them in a manager

type ResourceManager struct{
    Resources []*Resource //or a map 
}

I am trying to figure out the optimal, safe way for the manager to create a function func (m *ResourceManager)GetUnusedResouce()(*Resouce,error) that will:

  • Iterate though all the resources until one that is not InUse is found
  • Mark it as InUse and return the *Resouce to the calling context/goroutine
  • I'd lock to avoid any system level locking (flock) and do this all in Go
  • There also needs to be a function to mark the Resouce are no longer in Use

Right now I use a mutex in the manager to lock access as I iterate through the entire slice. It is safe, but I am hoping to speed this up by being able to search for an used resource concurrently and handle two goroutines trying to mark the same resource as InUse.

Update

I am specifically wondering if making the Resource InUse field an int64 and then using atomic.CompareAndSwapInt64 would allow the Resource manager to lock right when it found an unused resource:

func (m *ResourceManager)GetUnusedResouce()(*Resouce,error){
    for i := range Resources{
        if atomic.CompareAndSwapInt64(&Resouces[i].InUse,1){
            return Resouces[i],nil
        }
    }
    return nil, errors.New("all resouces in use")
}

Any unit tests to better test this would also be appreciated.

CodePudding user response:

The GetUnusedResouce function in the question can potentially execute compare and swap operations for all resources. Depending on the number of resources and the application access pattern, it can be quicker to execute a small number of operations protected by a mutex.

Use a singly linked list to implement fast get and put operations.

type Resource struct {
    next     *Resource
    Resource string
}

type ResourceManager struct {
    free *Resource
    mu   sync.Mutex
}

// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
    m.mu.Lock()
    defer m.mu.Unlock()
    result := m.free
    if m.free != nil {
        m.free = m.free.next
    }
    return result
}

// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
    m.mu.Lock()
    defer m.mu.Unlock()
    r.next = m.free
    m.free = r
}

Here's an example use in a test:

func TestResourceManager(t *testing.T) {

    // Add free resources to a manager.
    var m ResourceManager
    m.Put(&Resource{Resource: "/dev/a"})
    m.Put(&Resource{Resource: "/dev/b"})

    // Test that we can get all resources from the pool.

    ra := m.Get()
    rb := m.Get()
    if ra.Resource > rb.Resource {
        // Sort ra, rb to make test independent of order.
        ra, rb = rb, ra
    }
    if ra == nil || ra.Resource != "/dev/a" {
        t.Errorf("ra is %v, want /dev/a", ra)
    }
    if rb == nil || rb.Resource != "/dev/b" {
        t.Errorf("rb is %v, want /dev/b", rb)
    }

    // Check for empty pool.

    r := m.Get()
    if r != nil {
        t.Errorf("r is %v, want nil", r)
    }

    // Return one resource and try again.

    m.Put(ra)
    ra = m.Get()
    if ra == nil || ra.Resource != "/dev/a" {
        t.Errorf("ra is %v, want /dev/a", ra)
    }
    r = m.Get()
    if r != nil {
        t.Errorf("r is %v, want nil", r)
    }

}

Run the test on the playground.

Use a channel if there's a known reasonable bound on the number of resources. This approach takes advantage of the runtime's highly optimized channel implementation.

type Resource struct {
    Resource string
}

type ResourceManager struct {
    free chan *Resource
}

// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
    select {
    case r := <-m.free:
        return r
    default:
        return nil
    }
}

// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
    m.free <- r
}

// NewResourceManager returns a manager that can hold up to
// n free resources.
func NewResourceManager(n int) *ResourceManager {
    return &ResourceManager{free: make(chan *Resource, n)}
}

Test this implementation using the TestResourceManager function above, but replace var m ResourceManager with m := NewResourceManager(4).

Run the test on the Go playground.

CodePudding user response:

Whether or not a given resource is in-use is not a property of the Resource itself, but of the ResourceManager.

In fact, there is no need to keep track of in-use resources (unless you need to for some reason not mentioned in the question). An in-use resource can be simply put back into the pool when it is released.

Here's a possible implementation using channels. Not a single mutex, nor any atomic CAS needed.

package main

import (
    fmt "fmt"
    "time"
)

type Resource struct {
    Data string
}

type ResourceManager struct {
    resources []*Resource
    closeCh   chan struct{}
    acquireCh chan *Resource
    releaseCh chan *Resource
}

func NewResourceManager() *ResourceManager {
    r := &ResourceManager{
        closeCh:   make(chan struct{}),
        acquireCh: make(chan *Resource),
        releaseCh: make(chan *Resource),
    }
    go r.run()
    return r
}

func (r *ResourceManager) run() {
    defer close(r.acquireCh)
    for {
        if len(r.resources) > 0 {
            select {
            case r.acquireCh <- r.resources[len(r.resources)-1]:
                r.resources = r.resources[:len(r.resources)-1]
            case res := <-r.releaseCh:
                r.resources = append(r.resources, res)
            case <-r.closeCh:
                return
            }
        } else {
            select {
            case res := <-r.releaseCh:
                r.resources = append(r.resources, res)
            case <-r.closeCh:
                return
            }
        }
    }
}

func (r *ResourceManager) AcquireResource() *Resource {
    return <-r.acquireCh
}

func (r *ResourceManager) ReleaseResource(res *Resource) {
    r.releaseCh <- res
}

func (r *ResourceManager) Close() {
    close(r.closeCh)
}

// small demo below ...

func test(id int, r *ResourceManager) {
    for {
        res := r.AcquireResource()
        fmt.Printf("test %d: %s\n", id, res.Data)
        time.Sleep(time.Millisecond)
        r.ReleaseResource(res)
    }
}

func main() {
    r := NewResourceManager()
    r.ReleaseResource(&Resource{"Resource A"}) // initial setup
    r.ReleaseResource(&Resource{"Resource B"}) // initial setup
    go test(1, r)
    go test(2, r)
    go test(3, r) // 3 consumers, but only 2 resources ...
    time.Sleep(time.Second)
    r.Close()
}
  • Related