Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

def import(collection_name,  files, row_based, partition_name=None, options=None)

  • collection_name:  the target collection name  (required)
  • partition_name: target partition name  (optional)
  • row_based: a boolean to specify row-based or column-based
  • files: a list of files with row-based format or a dict of files with column-based format  format files (required)
    row-based files:   ["file_1.json", "file_2.json"]
    column-based files:  {"id": "id["file_1.json", "vectors": "embeddings.npy"}]

  • options: extra options in JSON format, for example: the MinIO/S3 bucket where the files come from  (optional)
    {"bucket": "mybucket"}
  • return a list of task ids

...

            Note: the "state" could be "pending", "started"parsing, "downloaded", "persistedparsed", "indexingpersisted", ""completed", "failed"


Pre-defined format for import files

...

Code Block
import(collection_name="test", files=row_based = true, files=["file_1.json"])



(2) Column-based data file, each  a JSON file represents a column. We require the keyword "values" as a key of the field data.In this case, there are two fields, so we create 2 JSON files:
contains multiple columns.
file_1.json for the "uid" field and "vector" field:

Code Block
{
  "valuesuid": [101, 102, 103, 104, 105]
}

file_2.json for the "vector" field:

Code Block
{
  "valuesvector": [[1.1, 1.2, 1.3, 1.4], [2.1, 2.2, 2.3, 2.4], [3.1, 3.2, 3.3, 3.4], [4.1, 4.2, 4.3, 4.4], [5.1, 5.2, 5.3, 5.4]]
}

...

Code Block
import(collection_name="test", row_based=false, files={"uid": ["file_1.json", "vector": "file_2.json"}])


We also support store vectors in a Numpy file, we require the numpy file, let's say the "vector's name is equal to the filed name. Let's say the "vector" field is stored in vector.npy, the "uid" field is stored in "file_21.npyjson", then we can call import():

Code Block
import(collection_name="test", row_based=false, files={"uid": "["file_1.json", "vector": "file_2.npy"}])


Error handling

The Import():

  • Return error "Collection doesn't exist" if the target collection doesn't exist
  • Return error "Party doesn't exist" if the target partition doesn't exist
  • Return error "Bucket doesn't exist" if the target bucket doesn't exist
  • Return error "File list is empty" if the row-based files list is empty
  • For row-based files, all fields must be presentedbe presented, otherwise, return the error "The field xxx is not provided"
  • For column-based files, each field must correspond to a file, otherwise, return the error "The field xxx is not provided"
  • For column-based files, each field must correspond to a file, otherwiseif a vector field is duplicated in numpy file and json file, return the error "The field xxx is not provided"duplicated

The get_import_state():

  • Return error "File xxx doesn't exist" if could not open the file. 
  • The row count of each field must be equal, otherwise, return the error "Inconsistent row count between field xxx and xxx". (all segments generated by this file will be abandoned)
  • If a vector dimension doesn't equal to field schema, return the error "Incorrect vector dimension for field xxx". (all segments generated by this file will be abandoned)
  • If a data file size exceed 1GB, return error "Data file size must be less than 1GB"
  • If an import task is no response for more than 6 hours, it will be marked as failed
  • If datanode is crashed or restarted, the import task on it will be marked as failed

2. Proxy RPC Interfaces

Code Block
service MilvusService {
  rpc Import(ImportRequest) returns (ImportResponse) {}
  rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {}
}

message ImportRequest {
  string collection_name = 1;                // target collection
  string partition_name = 2;                 // target partition
  bool row_based = 3;                        // the file is row-based or column-based
  repeated string files = 4;                 // file paths to be imported
  repeated common.KeyValuePair options = 5;  // import options, bucket, etc.
}

message ImportResponse {
  common.Status status = 1;
  repeated int64 tasks = 2;  // id array of import tasks
}

message GetImportStateRequest {
  int64 task = 1;  // id of an import task
}

enum ImportState {
    ImportPending = 0;
    ImportFailed = 1;
    ImportDownloaded = 2;
    ImportParsed = 3;
    ImportPersisted = 4;
    ImportCompleted = 5;
}

message GetImportStateResponse {
  common.Status status = 1;
  ImportState state = 2;                   // is this import task finished or not
  int64 row_count = 3;                     // if the task is finished, this value is how many rows are imported. if the task is not finished, this value is how many rows are parsed. return 0 if failed.
  repeated int64 id_list = 4;              // auto generated ids if the primary key is autoid
  repeated common.KeyValuePair infos = 5;  // more informations about the task, progress percent, file path, failed reason, etc.
}

...