Home > Blockchain >  Function input based locking in multithreading
Function input based locking in multithreading

Time:06-11

I'm working with a piece of code that processes messages of a queue (using masstransit). Many messages can be processed in parallel. All messages create or modify an object in ActiveDirectory (in this case). All objects need to be validated against the AD schema definitions. (Though its not relevant to the problem, I want to note that we have many customers with custom extension in their AD Schema)

Retrieving the schema information is a slow operation. I want to do it 1 time and then cache it. But with many parallel processing messages. Many messages start getting the schema information before the first succeeds. So too much work is done. For the moment I fixed this with a simple semaphore. See code below.

But that is not a good solution as now only 1 thread can enter this code all the time.

I need something to lock the code 1 time per object and hold off other request until the first retrieval and caching is complete.

What kind of construct will allow me to do that?

private static SemaphoreSlim _lock = new SemaphoreSlim(1, 1);

public ActiveDirectorySchemaObject? GetSchemaObjectFor(string objectClass)
{

    //todo: create better solution
    _lock.Wait();
    try
    {
        if (_activeDirectorySchemaContainer.HasSchemaObjectFor(
            _scopeContext.CustomerId, objectClass) == false)
        {
            _logger.LogInformation($"Getting and caching schema from AD "  
                $"for {objectClass}");
            _activeDirectorySchemaContainer.SetSchemaObjectFor(
                _scopeContext.CustomerId, objectClass,
                GetSchemaFromActiveDirectory(objectClass));
        }
    }
    finally
    {
        _lock.Release();
    }
    return _activeDirectorySchemaContainer.GetSchemaObjectFor(
        _scopeContext.CustomerId, objectClass);
}

The following is a possible simplification of the question. In short. I am looking for the proper construct to lock a piece of code for parallel acces for every variation of a input.

A comment mentioned Lazy. Something I have not used before. But reading the docs I see it defers initialization of an object until later. Maybe I could refactor for that. But looking at the code as it currently is, I seem to need an lazy "if" or an lazy "function", but maybe I am over complicating. I find thinking about parallel programming often hurts my head.

As requested the schema container class code containing setschemafor and the other functions. Thanks so far for all information provided.

public interface IActiveDirectorySchemaContainer
    {
        //Dictionary<string, Dictionary<string, JObject>> schemaStore {  get; }

        bool HasSchemaObjectFor(string customerId, string objectClass);
        ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass);
        void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schema);
    }



    public class ActiveDirectorySchemaContainer : IActiveDirectorySchemaContainer
    {
        private Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>> _schemaStore = new Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>>();

        public bool HasSchemaObjectFor(string customerId, string objectClass)
        {
            if (!_schemaStore.ContainsKey(customerId))
                return false;

            if (!_schemaStore[customerId].ContainsKey(objectClass))
                return false;

            if (_schemaStore[customerId][objectClass] != null)
                return true;
            else
                return false;
        }

        public ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass)
        {
            return _schemaStore[customerId][objectClass];
        }

        public void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schemaObject)
        {
            if (HasSchemaObjectFor(customerId, objectClass))
            {
                _schemaStore[customerId][objectClass] = schemaObject;
            }
            else
            {
                if (!_schemaStore.ContainsKey(customerId))
                {
                    _schemaStore.Add(customerId, new Dictionary<string, ActiveDirectorySchemaObject>());
                }

                if (!_schemaStore[customerId].ContainsKey(objectClass))
                {
                    _schemaStore[customerId].Add(objectClass, schemaObject);
                }
                else
                {
                    _schemaStore[customerId][objectClass] = schemaObject;
                }
            }
        }
    }

The customerId is to separate schema information for multiple customers And the container is provided by dependency injection as a singleton. Every message can have a different customerId and be processed concurrently. Yet I want to retrieve schema data only a single time. This architecture might not be ideal, but I am not allowed to change that at this time.

 public static IServiceCollection AddActiveDirectorySchemaService(
             this IServiceCollection services)
        {
            services.AddScoped<IActiveDirectorySchemaService, ActiveDirectorySchemaService>();
            services.AddSingleton<IActiveDirectorySchemaContainer, ActiveDirectorySchemaContainer>();
            return services;
        }

CodePudding user response:

Here is how you could use a ConcurrentDictionary<TKey,TValue> that has Lazy<T> objects as values, in order to ensure that the schema of each key will be initialized only once:

private readonly ConcurrentDictionary<string, Lazy<Schema>> _cachedSchemas = new();

public Schema GetSchemaObjectFor(string objectClass)
{
    Lazy<Schema> lazySchema = _cachedSchemas.GetOrAdd(objectClass, key =>
    {
        return new Lazy<Schema>(() =>
        {
            _logger.LogInformation($"Getting schema for {key}");
            return GetSchemaFromActiveDirectory(key);
        });
    });
    return lazySchema.Value;
}

Your actual scenario seems to be more complex than this, since it also involves a _scopeContext.CustomerId argument that is passed to an _activeDirectorySchemaContainer.SetSchemaObjectFor method. I don't know how this argument affects the desirable behavior of the GetSchemaObjectFor method. If you add more info in the question, preferably showing the code of the _activeDirectorySchemaContainer.SetSchemaObjectFor method, I might be able to update this answer accordingly.

CodePudding user response:

A relatively simple approach would be to use a ConcurrentDictionary to keep a cache of loaded objects. Dictionaries divide items into buckets based on the hashcode of their keys, and then for ConcurrentDictionary, each bucket has its own lock. Using a dictionary like this will provide an efficiency boost over your current approach.

So as to avoid hammering the AD controller/database/whatever, I'm still going to use a semaphore to ensure that only one thread can request a schema at once. This only takes place when the dictionary doesn't already have the entry, however.

Note that this first option is more or less a complicated version of Theodor's answer, so if this works for you, it's probably best to go with that answer instead. And my second option could probably be optimised by incorporating Theodor's answer.

public class CachedSchemaContainer
{
    private readonly SchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;
        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                return schema;
            }
                
            // Go and get the schema, add it to the dictionary, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }
}

Another possible optimisation might be to cache a reference to the Schema object per-thread. This would mean that no locking would be required in the case that a given thread has accessed this specific schema before. We still have the thread-safe ConcurrentDictionary to cache the values between threads, but ultimately this will avoid a lot of locking once the caches are warmed up/populated:

public class CachedSchemaContainer : IDisposable
{
    private readonly ISchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly ThreadLocal<Dictionary<string, Schema>> _threadSchemaCache = new ThreadLocal<Dictionary<string, Schema>>(() => new Dictionary<string, Schema>());
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;

        // try and retrieve the value from the thread's cache
        if (_threadSchemaCache.Value.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            // it was already cached in the shared dictionary, so let's add it to the thread's
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                // it was already cached in the shared dictionary, so let's add it to the thread's
                _threadSchemaCache.Value[objectClass] = schema;
                return schema;
            }
                
            // Go and get the schema, add it to the shared and thread local dictionaries, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }

    public void Dispose()
    {
        _threadSchemaCache.Dispose();
    }
}

Common type definitions used in these examples:

public interface ISchemaRetriever
{
    Schema GetSchemaObjectFor(int customerId, string objectClass);
}

public class Schema
{
}

Documentation links:

Note: Schema here is a reference type (a class), so the dictionaries store a pointer to a common Schema object per loaded objectClass. As such, if one thread makes a change to the Schema object, then that could break another one, etc. unless the Schema object itself is also thread safe. If you're only reading values and not mutating the Schema objects, then you should have nothing to worry about there.

Also, as Theodor points out, unless you're planning to make this method async in the future, you could potentially do away with using a SemaphoreSlim and just use a simple lock (lockingObject) { } instead. Docs

  • Related