Current state: Accepted

ISSUE: https://github.com/milvus-io/milvus/issues/15604

PRs: 

Keywords: bulk load, import

Released: with Milvus 2.1 

Authors:  

Summary

Import data by a shortcut to get better performance compared with insert(). 


Motivation

Current insert() API can support insert batch entities, the internal logic of insert request:

  1. client calls insert() to transfer data to Milvus proxy node
  2. proxy split the data in do multiple parts according to sharding number of the collection
  3. proxy construct a message packet for each part, and send the message packets into Pulsar service
  4. data nodes pull the message packets from the Pulsar service, each data node pull a packet
  5. data nodes persist data into segment when flush() action is triggered

Typically, it cost several hours to insert one billion entities with 128-dimensional vectors. Lots of time is wasted in two major areas: network transmission and Pulsar management.
We can see there are at least three times network transmission in the process: 1) client => proxy  2) proxy => pulsar 3) pulsar => data node

We need a new interface to do bulk load without network bandwidth wasting and skip the Pulsar management. Brief requirements of the new interface:

  1. import data from JSON format files. (first stage)
  2. import data from Numpy format files. (first stage)
  3. copy a collection within one Milvus 2.0 service. (second stage)
  4. copy a collection from one Milvus 2.0 service to another. (second stage)
  5. import data from Milvus 1.x to Milvus 2.0 (third stage)
  6. parquet/faiss files (TBD)


Design Details

To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:

        1. client calls import() to pass some file paths to Milvus proxy node  

        2. proxy node passes the file paths to data coordinator

        3. data coordinator pick a data node or multiple data node (according to the files count) to parse files, each file can be parsed to a segment or multiple segments.


Some points to consider:

         Store scalar fields and vector field data in a JSON file with row-based example:

{
  "table": {
    "rows": [
      {"id": 1, "year": 2021, "vector": [1.0, 1.1, 1.2]},
      {"id": 2, "year": 2022, "vector": [2.0, 2.1, 2.2]},
      {"id": 3, "year": 2023, "vector": [3.0, 3.1, 3.2]}
    ]
  }
}

         Store scalar fields and vector field data in a JSON file with column-based example:

{
  "table": {
    "columns": [
      "id": [1, 2, 3],
      "year": [2021, 2022, 2023],
      "vector": [
        [1.0, 1.1, 1.2],
        [2.0, 2.1, 2.2],
        [3.0, 3.1, 3.2]
      ]
    ]
  }
}


SDK Interfaces

Based on the several points, we choose a JSON object as a parameter of python import() API, the API declaration will be like this:

def import(options)


The parameter "options" is a JSON object which has the following format:
Note: to improve usability, we also can declare an ORM class to wrap this json object.

{
	"data_source": {			          // required
		"type": "minio",		          // required, "minio" or "s3", case insensitive
		"address": "localhost:9000",	  // optional, milvus server will use its minio/s3 configuration if without this value
		"accesskey_id": "minioadmin",	  // optional, milvus server will use its minio/s3 configuration if without this value
		"accesskey_secret": "minioadmin", // optional, milvus server will use its minio/s3 configuration if without this value
		"use_ssl": false,		          // optional, milvus server will use its minio/s3 configuration if without this value
		"bucket_name": "aaa"		      // optional, milvus server will use its minio/s3 configuration if without this value
	},

	"internal_data": {		     // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus)
		"path": "xxx/xxx/xx",	 // required, relative path to the source storage where store the exported data
		"collections_mapping": { // optional, give a new name to collection during importing
			"coll_a": "coll_b",	     // collection name mapping, key is the source collection name, value is a new collection name
			"coll_c": "coll_d"
		}
	},

	"external_data": {			                     // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus)
		"target_collection": "xxx",	                 // required, target collection name
		"chunks": [{			                     // required, chunk list, each chunk can be import as one segment or split into multiple segments
				"files": [{	                         // required, files that provide data of a chunk
					"path": "xxxx / xx.json", 		 // required, relative path under the storage source defined by DataSource, currently support json/npy
					"type": "row_based",	  		 // required, "row_based" or "column_based", tell milvus how to parse this json file, case insensitive
                    "from": 0,                       // optional, import part of the file from a number
                    "to": 1000,                      // optional, import part of the file end by a number
					"fields_mapping": {			     // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty
						"table.rows.id": "uid",		 // field name mapping, tell milvus how to insert data to correct field, key is a json node path, value is a field name of the collection. If the file is numpy format, the key is a field name of the collection same with value.
						"table.rows.year": "year",
						"table.rows.vector": "vector"
					}
				}]
			}
		],
		"default_fields": { // optional, use default value to fill some fields
			"age": 0,       // key is a field name, value is default value of this field, can be number or string
			"weight": 0.0
		}
	}
}


Key fields of the JSON object:


How to pass this parameter in different situations:

Assume there is a collection named "TEST" with these fields:

{"uid":INT64, "year":INT32, "age":INT8, "embedding":FLOAT_VECTOR}

For the following situations:

  1. User has some JSON files store data with the row-based format:  file_1.json, file_2.json.

    {
      "data": {
        "rows": [
          {"id": 1, "year": 2021, "vector": [1.0, 1.1, 1.2]},
          {"id": 2, "year": 2022, "vector": [2.0, 2.1, 2.2]},
          {"id": 3, "year": 2023, "vector": [3.0, 3.1, 3.2]}
        ]
      }
    }

    The "options" could be:

    {
    	"data_source": {
    		"type": "Minio",
    		"address": "localhost:9000",
    		"accesskey_id": "minioadmin",
    		"accesskey_secret": "minioadmin",
    		"use_ssl": false,
    		"bucket_name": "mybucket"
    	},
    
    	"external_data": {
    		"target_collection": "TEST",
    		"chunks": [{
    				"files": [{
    					"path": "xxxx/file_1.json",
    					"type": "row_based",
    					"fields_mapping": {
    						"table.rows.id": "uid",
    						"table.rows.year": "year",
    						"table.rows.vector": "vector"
    					}
    				}]
    			},
    			{
    				"files": [{
    					"path": "xxxx/file_2.json",
    					"type": "row_based",
    					"fields_mapping": {
    						"table.rows.id": "uid",
    						"table.rows.year": "year",
    						"table.rows.vector": "vector"
    					}
    				}]
    			}
    		],
    		"default_fields": {
    			"age": 0
    		}
    	}
    }


  2. User has some JSON files store data with the column-based format:  file_1.json, file_2.json.

    {
      "table": {
        "columns": [
          "id": [1, 2, 3],
          "year": [2021, 2022, 2023],
          "vector": [
            [1.0, 1.1, 1.2],
            [2.0, 2.1, 2.2],
            [3.0, 3.1, 3.2]
          ]
        ]
      }
    }

    The "options" could be:

    {
    	"data_source": {
    		"type": "Minio",
    		"address": "localhost:9000",
    		"accesskey_id": "minioadmin",
    		"accesskey_secret": "minioadmin",
    		"use_ssl": false,
    		"bucket_name": "mybucket"
    	},
    
    	"external_data": {
    		"target_collection": "TEST",
    		"chunks": [{
    				"files": [{
    					"path": "xxxx/file_1.json",
    					"type": "column_based",
    					"fields_mapping": {
    						"table.columns.id": "uid",
    						"table.columns.year": "year",
    						"table.columns.vector": "vector"
    					}
    				}]
    			},
    			{
    				"files": [{
    					"path": "xxxx/file_2.json",
    					"type": "column_based",
    					"fields_mapping": {
    						"table.columns.id": "uid",
    						"table.columns.year": "year",
    						"table.columns.vector": "vector"
    					}
    				}]
    			}
    		],
    		"default_fields": {
    			"age": 0
    		}
    	}
    }


  3. User has a JSON file store data with the column-based format:  file_1.json, and a Numpy file store vectors data: file_2.npy
    The file_1.json:
{
  "table": {
    "columns": [
      "id": [1, 2, 3],
      "year": [2021, 2022, 2023],
      "age": [23, 34, 21]
      ]
    ]
  }
}

            The "options" could be:

{
	"data_source": {
		"type": "Minio",
		"address": "localhost:9000",
		"accesskey_id": "minioadmin",
		"accesskey_secret": "minioadmin",
		"use_ssl": false,
		"bucket_name": "mybucket"
	},

	"external_data": {
		"target_collection": "TEST",
		"chunks": [{
				"files": [{
					"path": "xxxx/file_1.json",
					"type": "column_based",
					"fields_mapping": {
						"table.columns.id": "uid",
						"table.columns.year": "year",
						"table.columns.age": "age"
					}
				}]
			},
			{
				"files": [{
					"path": "xxxx/file_2.npy",
					"type": "column_based",
					"fields_mapping": {
						"vector": "vector"
					}
				}]
			}
		]
	}
}


The "options" for other SDK is not JSON object. For Java SDk, a declaration could be:

public class ImportParam {
  private MinioDataSource data_source;
  private List<DataFile> external_files;
}




RPC Interfaces

    The declaration of import API in RPC level:

rpc Import(ImportRequest) returns (MutationResult) {}

message ImportRequest {
  common.MsgBase base = 1;
  string options = 2;      // the json options string
}


message MutationResult {
  common.Status status = 1;
  schema.IDs IDs = 2;             // return auto-id for insert/import, deleted id for delete
  repeated uint32 succ_index = 3; // succeed indexes for insert/import
  repeated uint32 err_index = 4;  // error indexes for insert/import
  bool acknowledged = 5;
  int64 insert_cnt = 6;           // how many entities were inserted or imported
  int64 delete_cnt = 7;
  int64 upsert_cnt = 8;
  uint64 timestamp = 9;
}




Internal machinery



Test Plan