Home > Software engineering >  Load data from reading files during startup and then process new files and clear old state from the
Load data from reading files during startup and then process new files and clear old state from the

Time:04-23

I am working on a project where during startup I need to read certain files and store it in memory in a map and then periodically look for new files if there are any and then replace whatever I had in memory in the map earlier during startup with this new data. Basically every time if there is a new file which is a full state then I want to refresh my in memory map objects to this new one instead of appending to it.

Below method loadAtStartupAndProcessNewChanges is called during server startup which reads the file and store data in memory. Also it starts a go-routine detectNewFiles which periodically checks if there are any new files and store it on a deltaChan channel which is later accessed by another go-routine processNewFiles to read that new file again and store data in the same map. If there is any error then we store it on err channel. loadFiles is the function which will read files in memory and store it in map.

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

This method basically figures out if there are any new files that needs to be consumed and if there is any then it will put it on the deltaChan channel which will be later on consumed by processNewFiles go-routine and read the file in memory. If there is any error then it will add error to the error channel.

func (r *customerConfig) detectNewFiles(rootPath string) {

}

This will read all s3 files and store it in memory and return error. In this method I clear previous state of my map so that it can have fresh state from new files. This method is called during server startup and also called whenever we need to process new files from processNewFiles go-routine.

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

This method read the files and add in the data concurrent map.

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10)   ":"   strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

This method will pick what is there on the delta channel and if there are any new files then it will start reading that new file by calling loadFiles method. If there is any error then it will add error to the error channel.

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call "loadFiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

If there is any error on the error channel then it will log those errors from below method -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

Problem Statement

Above logic works for me without any issues but there is one small bug in my code which I am not able to figure out on how to solve it. As you can see I have a concurrent map which I am populating in my read method and also clearing that whole map in loadFiles method. Because whenever there is a new file on delta channel I don't want to keep previous state in the map so that's why I am removing everything from the map and then adding new state from new files to it.

Now if there is any error in read method then the bug happens bcoz I have already cleared all the data in my data map which will have empty map which is not what I want. Basically if there is any error then I would like to preserve previous state in the data map. How can I resolve this issue in my above current design.

Note: I am using golang concurrent map

CodePudding user response:

Instead of updating map in read method let read method just return the data and error

func read(file string, bucket string) ([]CompModel, error) {
  // read file data and return with error if any
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return (nil, errs.Wrap(err))
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return (nil, errs.Wrap(err))
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return (nil, errors.New("No Data"))
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    return invMods, nil

  }
  return nil, errors.New("Something went wrong")
}

And then loadFiles can use the data and error return by read method and if no error only then clear and update the map else leave the old data as it was before

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  // r.data.Clear() <- remove the clear from here
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      data, err:= read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.data.Clear()
      for i := range data {
        key := strconv.FormatInt(data[i].ProductID, 10)   ":"   strconv.Itoa(int(data[i].Iaz))
        hasInventory := false
        if data[i].Available > 0 {
          hasInventory = true
        }
        r.data.Set(key, hasInventory)
      }
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

Note: Since the code is not runnable just updated methods for reference


The same can be achieved with just 3 functions - detect, read, load, detect will check for new files by interval and push to delta channel if found any, load will get file path to read from delta channel and call read method to get the data and error then checks if no error then clear the map and update with new content else log the error, so you would have 2 go routines and 1 function which would be called by load routine

package main

import (
  "fmt"

  "time"
  "os"
  "os/signal"
  "math/rand"
)

func main() {
  fmt.Println(">>>", center("STARTED", 30), "<<<")

  c := &Config{
    InitialPath: "Old Path",
    DetectInterval: 3000,
  }
  c.start()
  fmt.Println(">>>", center("ENDED", 30), "<<<")
}

// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
    return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w   len(s))/2, s))
}

type Config struct {
  deltaCh chan string
  ticker *time.Ticker
  stopSignal chan os.Signal
  InitialPath string
  DetectInterval time.Duration
}

func (c *Config) start() {
  c.stopSignal = make(chan os.Signal, 1)
  signal.Notify(c.stopSignal, os.Interrupt)

  c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
  c.deltaCh = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.InitialPath != "" {
    c.deltaCh <- c.InitialPath
  }
  <- c.stopSignal
  c.ticker.Stop()
}

// Detect New Files
func (c *Config) detect() {
  for {
    select {
      case <- c.stopSignal:
        return
      case <- c.ticker.C:
        fmt.Println(">>>", center("DETECT", 30), "<<<")
        c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
    }
  }
}
// Read Files
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.Println(">>>", center("READ", 30), "<<<")
  fmt.Println(path)
  return data, nil
}
// Load Files
func (c *Config) load() {
  for {
    select {
      case <- c.stopSignal:
        return
      case path := <- c.deltaCh:
        fmt.Println(">>>", center("LOAD", 30), "<<<")
        data, err := read(path)
        if err != nil {
          fmt.Println("Log Error")
        } else {
          fmt.Println("Success", data)
        }
        fmt.Println()
    }
  }
}

Note: Not included map in sample code it can be easily updated to include map

CodePudding user response:

Just allocate new one map. Like this:

var mu sync.Mutex
before := map[string]string{} // Some map before reading

after := make(map[string]string)

// Read files and fill `after` map

mu.Lock()
before = after
mu.Unlock()
  • Related