Current state: Accepted
ISSUE: https://github.com/milvus-io/milvus/issues/15604
PRs:
Keywords: bulk load, import
Released: with Milvus 2.1
Authors:
Import data by a shortcut to get better performance compared with insert().
Typically, it cost several hours to insert one billion entities with 128-dimensional vectors. Lots of time is wasted in two major areas: network transmission and Pulsar management.
We can see there are at least three times network transmission in the process: 1) client => proxy 2) proxy => pulsar 3) pulsar => data node
We need a new interface to do bulk load without network bandwidth wasting and skip the Pulsar management. Brief requirements of the new interface:
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to data coordinator node
3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.
The python API declaration:
def import(collection_name, files, partition_name=None, bucket=None, default_fields=None)
Proxy RPC Interfaces
The declaration of import API in proxy RPC:
service MilvusService { rpc Import(ImportRequest) returns (ImportResponse) {} } message ImportRequest { common.MsgBase base = 1; string options = 2; // options in JSON format } message ImportResponse { common.Status status = 1; repeated schema.IDs IDs = 2; // auto-generated ids for succeed chunks uint32 succ_index = 3; // number of chunks that successfully imported } |
The declaration of import API in datacoord RPC:
service DataCoord { rpc Import(milvuspb.ImportRequest) (milvuspb.ImportResponse) {} rpc CompleteImport(ImportResult) returns (common.Status) {} } message ImportResult { common.Status status = 1; schema.IDs IDs = 2; // auto-generated ids repeated int64 segments = 3; // id array of new sealed segments } |
The declaration of import API in datanode RPC:
service DataNode { rpc Import(milvus.ImportRequest) returns(common.Status) {} } |