Home > Net >  Divide in-memory data between service instances
Divide in-memory data between service instances

Time:03-21

Recently in a system design interview I was asked a question where cities were divided into zones and data of around 100 zones was available. An api took the zoneid as input and returned all the restaurants for that zone in response. The response time for the api was 50ms so the zone data was kept in memory to avoid delays.

If the zone data is approximately 25GB, then if the service is scaled to say 5 instances, it would need 125GB ram.

Now the requirement is to run 5 instances but use only 25 GB ram with the data split between instances.

I believe to achieve this we would need a second application which would act as a config manager to manage which instance holds which zone data. The instances can get which zones to track on startup from the config manager service. But the thing I am not able to figure out is how we redirect the request for a zone to the correct instance which holds its data especially if we use kubernetes. Also if the instance holding partial data restarts then how do we track which zone data it was holding

CodePudding user response:

Splitting dataset over several nodes: sounds like sharding.

In-memory: the interviewer might be asking about redis or something similar.

Maybe this: https://redis.io/topics/partitioning#different-implementations-of-partitioning

Redis cluster might fit -- keep in mind that when the docs mention "client-side partitioning": the client is some redis client library, loaded by your backends, responding to HTTP client/end-user requests


Answering your comment: then, I'm not sure what they were looking for.

Comparing Java hashmaps to a redis cluster isn't completely fair, considering one is bound to your JVM, while the other is actually distributed / sharded, implying at least inter-process communications and most likely network/non-local queries.

Then again, if the question is to scale an ever-growing JVM: at some point, we need to address the elephant in the room: how do you guarantee data consistency, proper replication/sharding, what do you do when a member goes down, ...?

Distributed hashmap, using Hazelcast, may be more relevant. Some (hazelcast) would make the argument it is safer under heavy write load. Others that migrating from Hazelcast to Redis helped them improve service reliability. I don't have enough background in Java myself, I wouldn't know.

As a general rule: when asked about Java, you could argue that speed and reliability very much rely on your developers understanding of what they're doing. Which, in Java, implies a large margin of error. While we could suppose: if they're asking such questions, they probably have some good devs on their payroll.

Whereas distributed databases (in-memory, on disk, SQL or noSQL), ... is quite a complicated topic, that you would need to master (on top of java), to get it right.

CodePudding user response:

The broad approach they're describing was described by Adya in 2019 as a LInK store. Linked In-memory Key-value stores allow for application objects supporting rich operations to be sharded across a cluster of instances.

I would tend to approach this by implementing a stateful application using Akka (disclaimer: I am at this writing employed by Lightbend, which employs the majority of the developers of Akka and offers support and consulting services to clients using Akka; as my SO history indicates, I would have the same approach even multiple years before I was employed by Lightbend) along these lines.

  • Akka Cluster to allow a set of JVMs running an application to form a cluster in a peer-to-peer manner and manage/track changes in the membership (including detecting instances which have crashed or are isolated by a network partition)

  • Akka Cluster Sharding to allow stateful objects keyed by ID to be distributed approximately evenly across a cluster and rebalanced in response to membership changes

These stateful objects are implemented as actors: they can update their state in response to messages and (since they process messages one at a time) without needing elaborate synchronization.

Cluster sharding implies that the actor responsible for an ID might exist on different instances, so that implies some persistence of the state of the zone outside of the cluster. For simplicity*, when an actor responsible for a given zone starts, it initializes itself from datastore (could be S3, could be Dynamo or Cassandra or whatever): after this its state is in memory so reads can be served directly from the actor's state instead of going to an underlying datastore.

By directing all writes through cluster sharding, the in-memory representation is, by definition, kept in sync with the writes. To some extent, we can say that the application is the cache: the backing datastore only exists to allow the cache to survive operational issues (and because it's only in response to issues of that sort that the datastore needs to be read, we can optimize the data store for writes vs. reads).

Cluster sharding relies on a conflict-free replicated data type (CRDT) to broadcast changes in the shard allocation to the nodes of the cluster. This allows, for instance, any instance to handle an HTTP request for any shard: it simply forwards a representation of the important parts of the request as a message to the shard which will distribute it to the correct actor.

From Kubernetes' perspective, the instances are stateless: no StatefulSet or similar is needed. The pods can query the Kubernetes API to find the other pods and attempt to join the cluster.

*: I have a fairly strong prior that event sourcing would be a better persistence approach, but I'll set that aside for now.

  • Related