I am working on a project where during startup I need to read certain files and store it in memory in a map and then periodically look for new files if there are any and then replace whatever I had in memory in the map earlier during startup with this new data. Basically every time if there is a new file which is a full state
then I want to refresh my in memory map objects to this new one instead of appending to it.
Below method loadAtStartupAndProcessNewChanges
is called during server startup which reads the file and store data in memory. Also it starts a go-routine detectNewFiles
which periodically checks if there are any new files and store it on a deltaChan
channel which is later accessed by another go-routine processNewFiles
to read that new file again and store data in the same map. If there is any error then we store it on err
channel. loadFiles
is the function which will read files in memory and store it in map
.
type customerConfig struct {
deltaChan chan string
err chan error
wg sync.WaitGroup
data *cmap.ConcurrentMap
}
// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
path, err := r.GetPath("...", "....")
if err != nil {
return err
}
r.wg.Add(1)
go r.detectNewFiles(path)
err = r.loadFiles(4, path)
if err != nil {
return err
}
r.wg.Add(1)
go r.processNewFiles()
return nil
}
This method basically figures out if there are any new files that needs to be consumed and if there is any then it will put it on the deltaChan
channel which will be later on consumed by processNewFiles
go-routine and read the file in memory. If there is any error then it will add error to the error channel.
func (r *customerConfig) detectNewFiles(rootPath string) {
}
This will read all s3
files and store it in memory and return error. In this method I clear previous state of my map so that it can have fresh state from new files. This method is called during server startup and also called whenever we need to process new files from processNewFiles
go-routine.
func (r *customerConfig) loadFiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
r.data.Clear()
g, ctx := errgroup.WithContext(context.Background())
sem := make(chan struct{}, workers)
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
return r.read(spn, file, bucket)
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
This method read the files and add in the data
concurrent map.
func (r *customerConfig) read(file string, bucket string) error {
// read file and store it in "data" concurrent map
// and if there is any error then return the error
var err error
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 8)
if err != nil {
return errs.Wrap(err)
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return nil
}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &invMods)
if err != nil {
return errs.Wrap(err)
}
for i := range invMods {
key := strconv.FormatInt(invMods[i].ProductID, 10) ":" strconv.Itoa(int(invMods[i].Iaz))
hasInventory := false
if invMods[i].Available > 0 {
hasInventory = true
}
r.data.Set(key, hasInventory)
}
}
return nil
}
This method will pick what is there on the delta channel
and if there are any new files then it will start reading that new file by calling loadFiles
method. If there is any error then it will add error to the error channel.
// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
// find new files on delta channel
// and call "loadFiles" method to read it
// if there is any error, then it will add it to the error channel.
}
If there is any error on the error channel
then it will log those errors from below method -
func (r *customerConfig) handleError() {
// read error from error channel if there is any
// then log it
}
Problem Statement
Above logic works for me without any issues but there is one small bug in my code which I am not able to figure out on how to solve it. As you can see I have a concurrent map which I am populating in my read
method and also clearing that whole map in loadFiles
method. Because whenever there is a new file on delta channel I don't want to keep previous state in the map so that's why I am removing everything from the map and then adding new state from new files to it.
Now if there is any error in read
method then the bug happens bcoz I have already cleared all the data in my data
map which will have empty map which is not what I want. Basically if there is any error then I would like to preserve previous state in the data
map. How can I resolve this issue in my above current design.
Note: I am using golang concurrent map
CodePudding user response:
Instead of updating map
in read
method let read
method just return the data and error
func read(file string, bucket string) ([]CompModel, error) {
// read file data and return with error if any
var err error
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return (nil, errs.Wrap(err))
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 8)
if err != nil {
return (nil, errs.Wrap(err))
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return (nil, errors.New("No Data"))
}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return (nil, errs.Wrap(err))
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return (nil, errs.Wrap(err))
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &invMods)
if err != nil {
return (nil, errs.Wrap(err))
}
return invMods, nil
}
return nil, errors.New("Something went wrong")
}
And then loadFiles
can use the data and error return by read
method and if no error only then clear and update the map else
leave the old data as it was before
func (r *customerConfig) loadFiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
// r.data.Clear() <- remove the clear from here
g, ctx := errgroup.WithContext(context.Background())
sem := make(chan struct{}, workers)
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
data, err:= read(spn, file, bucket)
if err != nil {
return err
}
r.data.Clear()
for i := range data {
key := strconv.FormatInt(data[i].ProductID, 10) ":" strconv.Itoa(int(data[i].Iaz))
hasInventory := false
if data[i].Available > 0 {
hasInventory = true
}
r.data.Set(key, hasInventory)
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
Note: Since the code is not runnable just updated methods for reference
The same can be achieved with just 3 functions - detect, read, load, detect will check for new files by interval and push to delta channel if found any, load will get file path to read from delta channel and call read method to get the data and error then checks if no error then clear the map and update with new content else log the error, so you would have 2 go routines and 1 function which would be called by load routine
package main
import (
"fmt"
"time"
"os"
"os/signal"
"math/rand"
)
func main() {
fmt.Println(">>>", center("STARTED", 30), "<<<")
c := &Config{
InitialPath: "Old Path",
DetectInterval: 3000,
}
c.start()
fmt.Println(">>>", center("ENDED", 30), "<<<")
}
// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w len(s))/2, s))
}
type Config struct {
deltaCh chan string
ticker *time.Ticker
stopSignal chan os.Signal
InitialPath string
DetectInterval time.Duration
}
func (c *Config) start() {
c.stopSignal = make(chan os.Signal, 1)
signal.Notify(c.stopSignal, os.Interrupt)
c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
c.deltaCh = make(chan string, 1)
go c.detect()
go c.load()
if c.InitialPath != "" {
c.deltaCh <- c.InitialPath
}
<- c.stopSignal
c.ticker.Stop()
}
// Detect New Files
func (c *Config) detect() {
for {
select {
case <- c.stopSignal:
return
case <- c.ticker.C:
fmt.Println(">>>", center("DETECT", 30), "<<<")
c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
}
}
}
// Read Files
func read(path string) (map[string]int, error) {
data := make(map[string]int)
data[path] = 0
fmt.Println(">>>", center("READ", 30), "<<<")
fmt.Println(path)
return data, nil
}
// Load Files
func (c *Config) load() {
for {
select {
case <- c.stopSignal:
return
case path := <- c.deltaCh:
fmt.Println(">>>", center("LOAD", 30), "<<<")
data, err := read(path)
if err != nil {
fmt.Println("Log Error")
} else {
fmt.Println("Success", data)
}
fmt.Println()
}
}
}
Note: Not included map in sample code it can be easily updated to include map
CodePudding user response:
Just allocate new one map. Like this:
var mu sync.Mutex
before := map[string]string{} // Some map before reading
after := make(map[string]string)
// Read files and fill `after` map
mu.Lock()
before = after
mu.Unlock()