Current state: Accepted
ISSUE: https://github.com/milvus-io/milvus/issues/15604
PRs:
Keywords: bulk load, import
Released: with Milvus 2.1
Authors: yhmo
Import data by a shortcut to get better performance compared with insert().
Typically, it cost several hours to insert one billion entities with 128-dimensional vectors. Lots of time is wasted in two major areas: network transmission and Pulsar management.
We can see there are at least three times network transmission in the process: 1) client => proxy 2) proxy => pulsar 3) pulsar => data node
We need a new interface to do bulk load without network bandwidth wasting and skip the Pulsar management. Brief requirements of the new interface:
To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:
1. client calls import() to pass some file paths to Milvus proxy node
2. proxy node passes the file paths to data coordinator node
3. data coordinator node picks a data node or multiple data nodes (according to the sharding number) to parse files, each file can be parsed into a segment or multiple segments.
The python API declaration:
def import(collection_name, files, partition_name=None, options=None)
def get_import_state(task_id)
Pre-defined format for import files
Assume we have a collection with 2 fields(one primary key and one vector field) and 5 rows:
uid | vector |
---|---|
101 | [1.1, 1.2, 1.3, 1.4] |
102 | [2.1, 2.2, 2.3, 2.4] |
103 | [3.1, 3.2, 3.3, 3.4] |
104 | [4.1, 4.2, 4.3, 4.4] |
105 | [5.1, 5.2, 5.3, 5.4] |
There are two ways to represent the collection with data files:
(1) Row-based data file, a JSON file contains multiple rows.
file_1.json:
{ {"uid": 101, "vector": [1.1, 1.2, 1.3, 1.4]}, {"uid": 102, "vector": [2.1, 2.2, 2.3, 2.4]}, {"uid": 103, "vector": [3.1, 3.2, 3.3, 3.4]}, {"uid": 104, "vector": [4.1, 4.2, 4.3, 4.4]}, {"uid": 105, "vector": [5.1, 5.2, 5.3, 5.4]}, } |
Call import() to import the file:
import(collection_name="test", files=["file_1.json"]) |
(2) Column-based data file, each JSON file represents a column. We require the keyword "values" as a key of the field data.
In this case, there are two fields, so we create 2 JSON files:
file_1.json for the "uid" field:
{ "values": [101, 102, 103, 104, 105] } |
file_2.json for the "vector" field:
{ "values": [[1.1, 1.2, 1.3, 1.4], [2.1, 2.2, 2.3, 2.4], [3.1, 3.2, 3.3, 3.4], [4.1, 4.2, 4.3, 4.4], [5.1, 5.2, 5.3, 5.4]] } |
Call import() to import the file:
import(collection_name="test", files={"uid": "file_1.json", "vector": "file_2.json"}) |
We also support store vectors in a Numpy file, let's say the "vector" field is stored in file_2.npy, then we can call import():
import(collection_name="test", files={"uid": "file_1.json", "vector": "file_2.npy"}) |
Error handling
The Import():
The get_import_state():
service MilvusService { rpc Import(ImportRequest) returns (ImportResponse) {} rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {} } message ImportRequest { string collection_name = 1; // target collection string partition_name = 2; // target partition bool row_based = 3; // the file is row-based or column-based repeated string files = 4; // file paths to be imported repeated common.KeyValuePair options = 5; // import options } message ImportResponse { common.Status status = 1; repeated int64 tasks = 2; // id array of import tasks } message GetImportStateRequest { int64 task = 1; // id of animport task } message GetImportStateResponse { common.Status status = 1; bool finished = 2; // is this import task finished or not int64 row_count = 3; // if the task is finished, this value is how many rows are imported. if the task is not finished, this value is how many rows are parsed. } |
The declaration of import API in datacoord RPC:
service DataCoord { rpc Import(milvus.ImportRequest) returns (milvus.ImportResponse) {} rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {} rpc CompleteImport(ImportResult) returns (common.Status) {} } message ImportResult { common.Status status = 1; repeated int64 segments = 2; // id array of new sealed segments int64 row_count = 3; // how many rows are imported by this task } |
The declaration of import API in datanode RPC:
service DataNode { rpc Import(milvus.ImportRequest) returns(common.Status) {} } |
There is a background knowledge that the inserted data shall be hashed into shards. Bulk load shall follow the same convention. There are two policy we can choose to satisfy this requirement:
Considering the efficiency and flexibility, we shall implement option 1.
By definition, the result segments shall be available altogether. Which means there shall be no intermediate state for loading.
To achieve this property, the segments shall be marked as "Loading" state and be invisible before the whole loading procedure completes.
Constraint: The segments generated by Bulk Load shall not be affected by delete operations before the whole procedure is finished.
An extra attribute may be needed to mark the Load finish ts and all delta log before this ts shall be ignored.
After the load is done, the result segments needs to be loaded if the target collection/partition is loaded.
DataCoord has two ways to notify the Query Cluster there are some new segments to load
Since we want to load the segments altogether, hand-off shall be a better candidate and some twist shall be taken:
The current behavior of query cluster is that if there is an index built for the collection, the segments will not be loaded(as sealed segment) before the index is built.
This constraint shall remain in first implementation of Bulk Load:
Constraint: The bulk load procedure shall include the period of index building of the result segments
The bulk load logic can be extracted into a tool to run outside of Milvus process. It shall be implemented in the next release.