Home > Enterprise >  Use ticker to periodically load all the files in memory from a path which keeps changing frequently?
Use ticker to periodically load all the files in memory from a path which keeps changing frequently?

Time:02-28

I have an application which needs to read files from two different path. After reading all these files, I need to load them up in memory in products map.

Path:

  • Full: This is the path which will have all files that we need to load up during server startup in memory. This path will have around 50 files and each file size is ~60MB.
  • Delta: This is the path which will have all the delta files that we need to load up in memory periodically every 1 minute. These files will only contain difference from the full path files. This path will have around 60 files and each file size is ~20MB.

Below code watchDeltaPath is called during server startup to watch for delta changes. It will get the delta path from GetDeltaPath method and from that path I need to load all the files in memory. This delta path keeps changing every few minutes and I cannot miss any one delta path and all the files in that path.

Loading all files in memory from loadAllFiles method can take some time (approx 5mins) so I am trying to find a way where I should not miss any new delta path (as it keeps changing every few minutes) and should be able to load all those files in memory from the delta path again and again periodically without any issue and efficiently.

I got the below code which runs every 1 minute and look for new delta path every time and then load all the files from that path in the memory. It works fine but I don't think this is the right approach to do it. What happens if loadAllFiles method takes more than 10 minutes to load all the files in memory and my ticker is running every 1 minute to look for new delta path and then find all the files in that new path and then load up in memory? Will it keep creating lot of background threads and maybe increase cpu-usage by a lot?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap
}

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
    ticker := time.NewTicker(1 * time.Minute)
    go func() {
        select {
        case <-r.done:
            ticker.Stop()
            return
        case <-ticker.C:
            func() (result error) {
                trans := r.logger.StartTransaction(nil, "delta-changes", "")
                defer trans.End()
                defer func() {
                    if result != nil {
                        trans.Errorf("Recovered from error: %v")
                    } else if err := recover(); err != nil {
                        trans.Errorf("Recovered from panic: %v", err)
                    }
                }()
                // get latest delta path everytime as it keeps changing every few minutes
                path, err := r.client.GetDeltaPath("delta")
                if err != nil {
                    return err
                }
                // load all the files in memory in "products" map from that path
                err = r.loadAllFiles(path)
                if err != nil {
                    return err
                }
                return nil
            }()
        }
    }()
    return nil
}

func (r *applicationRepository) Stop() {
    r.done <- struct{}{}
}

What is the best way to do this efficiently in prod?

Here is my play with code on how it is being executed - https://go.dev/play/p/FS4-B0FWwTe

CodePudding user response:

As per the comments the "best way to do this efficiently in prod" depends on a lot of factors and is probably not answerable on a site like Stack overflow. Having said that I can suggest an approach that might make it easier to think about how the problem could be best solved.

The below code (playground; pretty rough and untested) demonstrates an approach with three go routines:

  1. Detects new delta paths and pushes them to a buffered channel
  2. Handles the initial load
  3. Waits for initial load to finish then applies deltas (note that this does process deltas found while the initial load is underway)

As mentioned above there is insufficient detail in the question to ascertain whether this a good approach. It may be that the initial load and deltas can run simultaneously without saturating the IO but that would require testing (and would be a relatively small change).

// Simulation of process to perform initial load and handle deltas
package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

const deltaBuffer = 100
const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
const deltaCheckFrequency = time.Duration(500 * time.Millisecond)

func main() {
    ar := NewApplicationRepository()
    time.Sleep(5 * time.Second)
    ar.Stop()
    fmt.Println(time.Now(), "complete")
}

type applicationRepository struct {
    deltaChan       chan string   // Could be some other type...
    initialLoadDone chan struct{} // Closed when initial load finished

    done chan struct{}
    wg   sync.WaitGroup
}

func NewApplicationRepository() *applicationRepository {
    ar := applicationRepository{
        deltaChan:       make(chan string, deltaBuffer),
        initialLoadDone: make(chan struct{}),
        done:            make(chan struct{}),
    }

    ar.wg.Add(3)
    go ar.detectNewDeltas()
    go ar.initialLoad()
    go ar.deltaLoad()

    return &ar
}

// detectNewDeltas - watch for new delta paths
func (a *applicationRepository) detectNewDeltas() {
    defer a.wg.Done()
    var previousDelta string
    for {
        select {
        case <-time.After(deltaCheckFrequency):
            dp := a.getDeltaPath()
            if dp != previousDelta {
                select {
                case a.deltaChan <- dp:
                default:
                    panic("channel full - no idea what to do here!")
                }
                previousDelta = dp
            }
        case <-a.done:
            return
        }
    }
}

// getDeltaPath in real application this will retrieve the delta path
func (a *applicationRepository) getDeltaPath() string {
    return strconv.Itoa(time.Now().Second()) // For now just return the current second..
}

// initialLoad - load the initial data
func (a *applicationRepository) initialLoad() {
    defer a.wg.Done()
    defer close(a.initialLoadDone)
    time.Sleep(initialLoadTime) // Simulate time taken for initial load
}

// deltaLoad- load deltas found by detectNewDeltas
func (a *applicationRepository) deltaLoad() {
    defer a.wg.Done()
    fmt.Println(time.Now(), "deltaLoad started")

    // Wait for initial load to complete before doing anything
    <-a.initialLoadDone
    fmt.Println(time.Now(), "Initial Load Done")

    // Wait for incomming deltas and load them
    for {
        select {
        case newDelta := <-a.deltaChan:
            fmt.Println(time.Now(), newDelta)
        case <-a.done:
            return
        }
    }
}

// Stop - signal loader to stop and wait until this is done
func (a *applicationRepository) Stop() {
    close(a.done)
    a.wg.Wait()
}

CodePudding user response:

I think you want Golang Concurrency Patterns : Fan in, Fan out. You can search it in Google.

This I create an example code. You can copy-paste it and create folder full and delta with dummy file inside it.

package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "sync"
    "time"
)

type MyFile struct {
    full         map[string][]byte
    delta        map[string][]byte
    stopAutoLoad chan struct{}
}

func main() {
    mf := NewMyFile()
    mf.StartAutoLoadDelta(1 * time.Minute)
    time.Sleep(50 * time.Minute)
    fmt.Println(len(mf.full))
    fmt.Println(len(mf.delta))
}

func NewMyFile() *MyFile {
    mf := &MyFile{
        full:         make(map[string][]byte),
        delta:        make(map[string][]byte),
        stopAutoLoad: make(chan struct{}),
    }

    mf.LoadFile("full", 0)
    mf.LoadFile("delta", 0)
    return mf
}

func (mf *MyFile) StartAutoLoadDelta(d time.Duration) {
    ticker := time.NewTicker(d)

    go func() {
        defer func() {
            ticker.Stop()
        }()

        i := 1
        for {
            select {
            case <-ticker.C:
                mf.LoadFile("delta", i)
                i  
            case <-mf.stopAutoLoad:
                return
            }
        }
    }()
}

type Fileinfo struct {
    name string
    data []byte
    err  error
}

func (mf *MyFile) LoadFile(prefix string, i int) {
    log.Printf("%s load : %d", prefix, i)
    files, err := ioutil.ReadDir(prefix)
    if err != nil {
        panic("failed to open delta directory")
    }
    chanJobs := GenerateJobs(prefix, files)
    chanResultJobs := ReadFiles(chanJobs, 8)
    counterTotal := 0
    counterSuccess := 0
    for results := range chanResultJobs {
        if results.err != nil {
            log.Printf("error creating file %s. stack trace: %s", results.name, results.err)
        } else {
            switch prefix {
            case "delta":
                mf.delta[results.name] = results.data
            case "full":
                mf.full[results.name] = results.data
            default:
                panic("not implemented")
            }
            counterSuccess  
        }
        counterTotal  
    }

    log.Printf("status jobs running: %d/%d", counterSuccess, counterTotal)
}

func GenerateJobs(prefix string, files []fs.FileInfo) <-chan Fileinfo {
    chanOut := make(chan Fileinfo)

    go func() {
        for _, v := range files {
            chanOut <- Fileinfo{
                name: prefix   "/"   v.Name(),
            }
        }
        close(chanOut)
    }()

    return chanOut
}

func ReadFiles(chanIn <-chan Fileinfo, worker int) <-chan Fileinfo {
    chanOut := make(chan Fileinfo)

    var wg sync.WaitGroup

    wg.Add(worker)

    go func() {
        for i := 0; i < worker; i   {
            go func(workerIndex int) {
                defer wg.Done()
                for job := range chanIn {
                    log.Printf("worker %d is reading file %s", workerIndex, job.name)
                    data, err := os.ReadFile(job.name)
                    chanOut <- Fileinfo{
                        name: job.name,
                        data: data,
                        err:  err,
                    }
                }
            }(i)
        }
    }()

    go func() {
        wg.Wait()
        close(chanOut)
    }()
    return chanOut
}
  • Related