Home > Net >  Importing files into SQLite database not using all cores
Importing files into SQLite database not using all cores

Time:02-24

Context

I am importing IMDB database files into an SQLite database with the help of EntityFrameworkCore. In fact, two files, the titles.basics and the titles.akas (which is linked to basics via its movie ID).

At first, I had a single thread reading lines from basics and loop through akas until it changes of ID. Though, there was an issue there and most of all, it was too slow. So, I decided to create a multithread code that would read both files at the same time and another combining akas with the appropriate movie.

I am currently importing so I still do not know if my issue is fixed (probably it is). Though, it is still too much slow for me.

Issue

The combining part is still very slow, but more importantly, I can see my process is only using around 12% of CPU which corresponds to only 1/8 of total usage and I have 8 physical cores. So, it really seems the process is only using 1 core.

I am not giving any code here, as having a minimal testable code wouldn't mean anything. Though, you can see both versions here:

https://cints.net/public/Imdb-MultiThread.cs.txt

using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder   fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered  ;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder   fileName;
            var uncompressedFile = temporaryFolder   Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl   fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;


            var titlesFile = temporaryFolder   Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder   Path.GetFileNameWithoutExtension(AKAS_FILE);
            var dbLock = new SemaphoreSlim(1);
            var akasLock = new SemaphoreSlim(1);
            var currentTitlesAkasLock = new SemaphoreSlim(1);
            var associateLock = new SemaphoreSlim(1);

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;

                var titles = new ConcurrentDictionary<string, MetaMovie>();
                var readTitles = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(titlesFile), (titleLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                        dbLock.Wait();
                        MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).Include(m => m.Titles).FirstOrDefault();
                        dbLock.Release();
                        if (metaMovie == null)
                        {
                            int totalMinutes = -1;
                            if (!int.TryParse(movieInfos[7], out totalMinutes))
                            {
                                totalMinutes = -1;
                            }
                            metaMovie = new MetaMovie
                            {
                                ExternalId = movieInfos[0],
                                MetaSource = nameof(Imdb),
                                MovieType = movieInfos[1],
                                Title = movieInfos[3],
                                TotalMinutes = totalMinutes,
                                Genres = movieInfos[8]
                            };
                            metaMovie.Titles = new List<MetaTitle>();
                            if (int.TryParse(movieInfos[5], out int startYear))
                            {
                                metaMovie.StartYear = new DateTime(startYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.StartYear = new DateTime(9999, 1, 1);
                            }
                            if (int.TryParse(movieInfos[6], out int endYear))
                            {
                                metaMovie.EndYear = new DateTime(endYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.EndYear = metaMovie.StartYear;
                            }
                        }

                        titles.TryAdd(metaMovie.ExternalId, metaMovie);
                    });
                });

                var akas = new Dictionary<string, List<MetaTitle>>();
                var currentTitlesAkas = new ConcurrentDictionary<string, int>();
                var readAkas = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(akasFile), (akaLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        currentTitlesAkasLock.Wait();
                        var titleInfos = akaLine.Split("\t", StringSplitOptions.None);
                        var externalId = titleInfos[0];
                        if (!currentTitlesAkas.ContainsKey(externalId))
                        {
                            currentTitlesAkas.TryAdd(externalId, 1);
                        }
                        else
                        {
                            currentTitlesAkas[externalId]  ;
                        }
                        currentTitlesAkasLock.Release();

                        var metaTitle = new MetaTitle
                        {
                            MetaMovie = null,
                            Text = titleInfos[2],
                            Region = titleInfos[3],
                            Language = titleInfos[4]
                        };

                        akasLock.Wait();
                        List<MetaTitle> titleAkas;
                        if (!akas.ContainsKey(externalId))
                        {
                            titleAkas = new List<MetaTitle>();
                            akas.Add(externalId, titleAkas);
                        }
                        else
                        {
                            titleAkas = akas[externalId];
                        }
                        titleAkas.Add(metaTitle);
                        akasLock.Release();

                        currentTitlesAkasLock.Wait();
                        currentTitlesAkas[externalId]--;
                        currentTitlesAkasLock.Release();
                    });
                });

                var savingCounter = 0;
                var associate = Task.Factory.StartNew(() =>
                {
                    Parallel.For(1, Environment.ProcessorCount * 10, async (_) =>
                    {
                        var isAssociating = true;
                        do
                        {
                            var externalId = string.Empty;
                            var currentTitleAkaRemoved = false;
                            currentTitlesAkasLock.Wait();
                            foreach (var curExternalId in currentTitlesAkas.Keys.OrderBy(t => t))
                            {
                                if (currentTitlesAkas[curExternalId] == 0)
                                {
                                    externalId = curExternalId;
                                    break;
                                }
                            }
                            if (externalId != String.Empty)
                            {
                                currentTitleAkaRemoved = currentTitlesAkas.TryRemove(externalId, out int useless0); // Removing so other threads won't take it
                            }
                            isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                            currentTitlesAkasLock.Release();

                            if (String.IsNullOrEmpty(externalId) || !currentTitleAkaRemoved) continue;

                            if (titles.TryGetValue(externalId, out MetaMovie metaMovie))
                            {
                                akasLock.Wait();
                                var titleAkas = akas[externalId];
                                akas.Remove(externalId);
                                akasLock.Release();

                                var changedMovie = false;
                                var movieAkas = metaMovie.Titles.Select(t => t).ToList(); // Clone list
                                foreach (var metaTitle in titleAkas)
                                {
                                    var existingTitle = movieAkas.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                                    if (existingTitle == null)
                                    {
                                        changedMovie = true;
                                        metaMovie.Titles.Add(metaTitle);
                                    }
                                    else
                                    {
                                        movieAkas.Remove(existingTitle);
                                    }
                                }
                                foreach (var movieTitle in movieAkas)
                                {
                                    changedMovie = true;
                                    metaMovie.Titles.Remove(movieTitle);
                                }

                                dbLock.Wait();
                                if (metaMovie.Id == 0)
                                {
                                    db.Add(metaMovie);
                                }
                                else if (changedMovie)
                                {
                                    db.Update(metaMovie);
                                }
                                dbLock.Release();

                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryRemove(externalId, out int uselessOut); // Free memory
                                isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                                currentTitlesAkasLock.Release();

                                titles.TryRemove(externalId, out MetaMovie uselessOut2); // Free memory

                                associateLock.Wait();
                                savingCounter  ;
                                var localSavingCounter = savingCounter;
                                associateLock.Release();

                                if (localSavingCounter != 0 && localSavingCounter % 1000 == 0)
                                {
                                    var ttt = currentTitlesAkas.Where(t => t.Value > 0);
                                    dbLock.Wait();
                                    await db.SaveChangesAsync();
                                    dbLock.Release();
                                    Console.WriteLine("Saved "   localSavingCounter);
                                }
                            }
                            else if (!readTitles.IsCompleted) // If reading titles is not ended, then maybe it was not read yet... otherwise, it doesn't exist
                            {
                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryAdd(externalId, 0); // Readd because still no movie associated
                                currentTitlesAkasLock.Release();
                            }
                        } while (isAssociating);
                    });
                });

                Task.WaitAll(readTitles, readAkas, associate);
                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

https://cints.net/public/Imdb-SingleThread.cs.txt

using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder   fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered  ;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder   fileName;
            var uncompressedFile = temporaryFolder   Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl   fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;

            var titlesFile = temporaryFolder   Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder   Path.GetFileNameWithoutExtension(AKAS_FILE);
            var titlesLines = File.ReadLines(titlesFile);
            var akasLines = File.ReadLines(akasFile);


            var titlesIterator = titlesLines.GetEnumerator();
            titlesIterator.MoveNext(); // Skip columns headers

            var akasIterator = akasLines.GetEnumerator();
            akasIterator.MoveNext();
            akasIterator.MoveNext(); // Done twice to skip columns headers
            var currentAka = akasIterator.Current;
            var savingCounter = 0;

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;
                while (titlesIterator.MoveNext())
                {
                    var titleLine = titlesIterator.Current;
                    var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                    MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).FirstOrDefault();
                    var isNewMovie = false;
                    if (metaMovie == null)
                    {
                        int totalMinutes = -1;
                        if (!int.TryParse(movieInfos[7], out totalMinutes))
                        {
                            totalMinutes = -1;
                        }
                        isNewMovie = true;
                        metaMovie = new MetaMovie
                        {
                            ExternalId = movieInfos[0],
                            MetaSource = nameof(Imdb),
                            MovieType = movieInfos[1],
                            Title = movieInfos[3],
                            TotalMinutes = totalMinutes,
                            Genres = movieInfos[8]
                        };
                        metaMovie.Titles = new List<MetaTitle>();
                        if (int.TryParse(movieInfos[5], out int startYear))
                        {
                            metaMovie.StartYear = new DateTime(startYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.StartYear = new DateTime(9999, 1, 1);
                        }
                        if (int.TryParse(movieInfos[6], out int endYear))
                        {
                            metaMovie.EndYear = new DateTime(endYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.EndYear = metaMovie.StartYear;
                        }
                    }

                    var movieAkasIds = metaMovie.Titles.Select(t => t.Id).ToList();
                    var titleInfos = currentAka?.Split("\t", StringSplitOptions.None);
                    while (currentAka != null && int.Parse(titleInfos[0][2..]) <= int.Parse(metaMovie.ExternalId[2..]))
                    {
                        if (titleInfos[0] == metaMovie.ExternalId)
                        {
                            var metaTitle = new MetaTitle
                            {
                                MetaMovie = metaMovie,
                                Text = titleInfos[2],
                                Region = titleInfos[3],
                                Language = titleInfos[4]
                            };

                            var existingTitle = metaMovie.Titles.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                            if (existingTitle == null)
                            {
                                metaMovie.Titles.Add(metaTitle);
                            }
                            else
                            {
                                movieAkasIds.Remove(existingTitle.Id);
                            }
                        }
                        else
                        {
                            var a = 1;
                        }

                        akasIterator.MoveNext();
                        currentAka = akasIterator.Current;
                        titleInfos = currentAka.Split("\t", StringSplitOptions.None);
                    }

                    foreach(var movieTitleId in movieAkasIds)
                    {
                        metaMovie.Titles.Remove(metaMovie.Titles.Where(t => t.Id == movieTitleId).FirstOrDefault());
                    }

                    if (isNewMovie)
                    {
                        db.Add(metaMovie);
                    }
                    else
                    {
                        db.Update(metaMovie);
                    }

                    savingCounter  ;
                    if (savingCounter % 10000 == 0)
                    {
                        await db.SaveChangesAsync();
                        Console.WriteLine("Saved "   savingCounter);
                    }
                }

                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

In the multithread version, the slow part is in the method LoadMetaDataAsync and more precisely in var associate = Task.Factory.StartNew(() => code part.

This is in development and cleaning, splitting will done after I have the appropriate result/speed.

CodePudding user response:

Case closed. I returned to the single thread version and I found my initial issue (my code was supposing the files were in order, which they were partially).

Thank you for all people that participated.

  • Related