Home > Mobile >  What is the fastest way to merge tens of millions of files
What is the fastest way to merge tens of millions of files

Time:07-22

There are 50 million files, stored on an ubuntu computer, I want to merge these 50 million into several large files, how to do it the fastest? At present, I have saved the filename to be processed in filename.txt through the ls -1 command

I tried writing a go program that reads a file, and in turn reads the output to a file, but I found it too slow. The actual reading speed is about 1s to merge 30-40 files, and it takes more than 16 days to finish it.

Is there any good way to merge quickly?

Here is the go code I wrote:

const fileSizeLimit = (1 << 30) * 4 // 4GB
const filesStorePath = "<>"
func main() {
    fileNamesFile := ""
    outBasePath := ""

    startId := 0

    //del := false
    flag.StringVar(&fileNamesFile, "d", "", "filenames file")
    flag.StringVar(&outBasePath, "o", "", "out dir")
    flag.IntVar(&startId, "f", 0, "start fn")
    //flag.BoolVar(&del, "del", false, "del file")

    flag.Parse()

    start := time.Now()

    fmt.Printf("start:%s\n", start.Format("2006-01-02 15:04:05"))
    fmt.Printf("file names = %s\n", fileNamesFile)
    fmt.Printf("out dir = %s\n", outBasePath)


    allList, _ := ioutil.ReadFile(fileNamesFile)
    all := strings.Split(string(allList), "\n")
    total := len(all)
    store := newStoreItems(outBasePath, startId)

    uiLiveWriter := uilive.New()
    uiLiveWriter.Start()

    finish := make(chan bool, 1)
    pos := 0
    readCount := 0

    go func() {
        for i := pos; i < total; i   {
            pos = i
            fn := all[i]

            f := path.Join(filesStorePath, fn)
            if content, err := ioutil.ReadFile(f); err == nil {
                store.write(content)
            }
        }
    }()

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        // 当前文件
        for true {
            select {
            case <-ticker.C:
                t := time.Since(start)
                cost := t.Seconds()
                content := fmt.Sprintf("read %d/%d(%.2f%%), file=%d/%d, speed=%d/s\ttime %s\n",
                    pos, total, float64(pos)/float64(total)*100,
                    store.index, store.getSize(),
                    int(float64(readCount) / cost),
                    (time.Duration(cost) * time.Second).String())

                _, _ = fmt.Fprint(uiLiveWriter, content)
            }
        }
    }()

    osSignals := make(chan os.Signal, 1)
    signal.Notify(osSignals, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
    go func() {
        s := <-osSignals
        fmt.Println("stop !", s)

        finish <- false
    }()

    <-finish
    close(finish)

    _, _ = fmt.Fprintln(uiLiveWriter, "Finished ")
    uiLiveWriter.Stop() // flush and stop rendering
    fmt.Println("readCount", readCount)
    fmt.Println("exit 0")


}


type storeItems struct {
    basePath string
    w        *bufio.Writer
    file     *os.File
    size     int
    rowSize  int64
    index    int
    lock     sync.Mutex
}

func newStoreItems(storePath string, startFn int) *storeItems {
    fn := path.Join(storePath, strconv.Itoa(startFn))
    f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
    if err != nil {
        fmt.Printf("create [%s] fail! err: %s \n", fn, err)
    }

    return &storeItems{
        basePath: storePath,
        w:  bufio.NewWriterSize(f, util.GIGABYTE),
        file: f,
        size:  0,
        index: startFn,
    }
}

func (s *storeItems) getSize() int {
    return s.size
}

func (s *storeItems) nextFile() *os.File {
    if s.file != nil {
        _ = s.w.Flush()
        _ = s.file.Close()
    }
    nextIndex := s.index 1

    s.file, _ = os.OpenFile(path.Join(s.basePath, strconv.Itoa(nextIndex)),
        os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
    s.w = bufio.NewWriterSize(s.file, util.GIGABYTE)
    s.index = nextIndex
    s.size = 0
    return s.file
}

func (s *storeItems) write(b []byte) {
    _, _ = s.w.Write(b)
    _, _ = s.w.WriteRune('\n')
    s.size  = len(b)   1

    if s.w.Size() >= fileSizeLimit {
        // cut off file
        s.nextFile()
    }
}

execute output:

start:2022-07-22 05:03:09
file names = ***
out dir = ***
read 9057/50803783(0.02%), file=0/48151629, speed=40/s  time 3m41s

Observed system reads and writes: read: 4 M/s ~ 9 M/s

I have also tried using awk and cat commands, but the effect is about the same as go.

head ~/filename.txt -n 10000 | xargs awk '1' >> ~/out/0
sed -i '1,10000d' ~/filename.txt

CodePudding user response:

I'd use separate tools here: cat for joining the existing content, and split for creating chunks that have the desired output size. E.g.:

cat filename.txt | xargs cat | split -b 1M

With a million test files this runs at about 100K files/s on my PC, so it would complete for 50M files within 10 minutes. I did run this on tmpfs though and with only 4 bytes/file, though.

So, those numbers reflect the best case scenarion. If disk (or filesystem) speed is the bottleneck in your case then there's little that you can do about it I think. Still, I wouldn't expect that your setup would increase the runtime from ten minutes to two weeks : )

  • Related