Put(key, value string) error
BatchPut(kvs map[string] string) error
Delete(key string) error
BatchDelete(keys  string)
DeleteRange(start string, end string) error
DeletePrefix(prefix string) error
Get(key string) string
BatchGet(key string) string
Scan(startKey string, endKey string) string
ScanPrefix(prefixKey string) string
//Read modify operation
Merge(key, value string)
// most of the interface should be extended from rocksdb
1) Basic work flow
Proxy directly interact with KV Index.
Easy to implement
Cons: Hard to maintain the consistency between KV storage and message queue, no single source of truth
KV storage has to handle incremental data persistence , which means kv index has to be maintained its own log. Logs can not be stored on object storage, otherwise the write latency is too high. We need extra local or cloud storage for logs.itself.
Proxy directly write into message queue, utilize pulsar function to query and update KV index about which segment the delete belongs to, and write back the modifed binlogs into second stage pulsar.
No need to worry about consistency, read always the lastest data. form KV index.
Cons： Rely Couple with pulsar too much, hard to change pulsar to other message storage, especially cloud message storages such as Kenesis.
Performance will degrade to half because we write message queue twice.
KV index works as a log consumer. implements KV node and consumes data from message queue.
Pros: System architecture looks clean and easy to understand
Does not need to handle persistence, incremental can seek back and recover from message queue. Pulsar/Kafka connector helps to convert bin log and write into KV index.
could handle failure recovery by replaying message Q, but might be very slow since the kv index has different sharding policy with milvus sharding policy
Cons: each DML need to check from the KV index about the duplication, which might slow the system overall performance.
The schedule code of KV node might be very similar to querynode, why not reuse the same logic?
Pros: Performance loss is minimal
Does not need to handle persistence, incremental can seek back and recover from message queue. could handle failure recovery by replaying message Q, but might be very slow since the kv index has different sharding policy with milvus sharding policy
Follow the design rules of datanode write data and querynode serve data.
Might need to do rpc between querynode → querynode(for deduplication) and proxy → querynode(for retrieve entities).
Need have a discussion about whether plan3 or plan4 is favored.
2)What KV engine to support
we will support plugin storages, most of the storage are maintained by cloud provider or user's exist deployment. However, we still need one default implementation.
To achieve storage/computation disaggregation, the storage engine must support to store data on S3/Minio and other object storage, we find rocksdb-cloud, which can use for test.
How to maintain data persistence
How to shard data and route
could tail data easily from distributed log storage, store sstable on object storage, and serve read/write in miliseconds.
if you are interested in the rocksdb cloud, please take a look at http://borthakur.com/ftp/rocksdb-cloud_dhruba_borthakur.pdf
architecture of rocksdb cloud
Better feature of rocksdb cloud is Zero copy clones, this feature perfectly matches the way datanode and querynode works. datanode convert binlog into sstables while flush, write data into cloud bucket and notify querynode, while querynode can simply load data from S3 and remove theird local memtables.
Rocks Cloud also support tiered storage, we can caching hot data in local SSD while S3 is used for durability
How to shard data and route
To serve billlions of data, we need to shard the kv storage for distribution
- same shard as milvus itself → if shard number is not small, might block kv read and can not split
- shard by range
- shard by hash
Consistent Hashing is better balanced, easier to implementation, but range operation on parition key is slower.
Range partition requires subtle operation to split and merge exist partition to maintain load balance(Can not allocate shards beforehand unless we know the key distrbution in advance otherwise it will not be balanced)
From our use case, point queries are called more frequently, thus consistent hashing seems to fit our requirement.
The final KV Storage is illustrate in the above graph. QueryNode and DataNode both hold cloud rocksdb instance. Data is shared between two instance through the zero copy clone feature of rocksdb cloud.
Like Milvus, we separate data into two parts, incremental and historical. Incremental data only exists in querynode, and only exist for growing segments. Datanode consumes binlog from message stream, and write directly into remote KV store, once the segment is sealed, we can remove the caching at querynode side since we know all primary key of this segment has already been consumed by datanode and written into the remote KV store.
For deduplication and delete workloads, datanode can query from remote KV store directly because keys are consumed from message storage in order, thus all previous keys must already be visible at remote KV store.
For querynode, some of the keys may not be in remote kv store yet because data and query node may have different consume speed. We will need to query incremental KV in querynode memory to cover the latest write ins.
1) If Data node crash, not even need to do anything because current datanode recovery will handle the message queue reconsuming and write back.
2)if query node crash, will need to consume from message stream and rebuild incremental kv in memory
3)if remote KV store crash → 1.shard should be taken over by other kv stores.2. sstables can be directly loaded from object storage.
3.memtable need to be restored from message stream, which maybe slow because we need to filter out most majority of the data.
if KV store rewrite the log to message storage again the failure recovery should be simple and fast, but it requires extras IOs (prefered)
1) Test the RocksDB store on pulsar and S3, verify it works
2) Test consistent hash policy and remote access
3) Integrate KV storage with Milvus
4) Integrate external KV storage with Milvus
1. Use Query Api to find primary key, it need to query in each segment thus not efficient enough.
2. Use bloom filter index to filter the irrelevant segments
3. same shard policy between milvus and kv store?