Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

    Code Block
    // grpc-proto/milvus.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      repeated schema.FieldData fields_data = 5;	// fields' data
      repeated uint32 hash_keys = 6;
      uint32 num_rows = 7;
    }

    Data is inserted into fields_data by column, schemapb.FieldData is defined as following:

    Code Block
    // grpc-proto/schema.proto
    message ScalarField {
      oneof data {
        BoolArray bool_data = 1;
        IntArray int_data = 2;
        LongArray long_data = 3;
        FloatArray float_data = 4;
        DoubleArray double_data = 5;
        StringArray string_data = 6;
        BytesArray bytes_data = 7;
      }
    }
    ​
    message VectorField {
      int64 dim = 1;
      oneof data {
        FloatArray float_vector = 2;
        bytes binary_vector = 3;
      }
    }
    ​
    message FieldData {
      DataType type = 1;
      string field_name = 2;
      oneof field {
        ScalarField scalars = 3;
        VectorField vectors = 4;
      }
      int64 field_id = 5;
    }
  2.  milvuspb.InsertRequest is serialized and send via gRPC

  3.  Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/

    grpc-proto/schema.proto
    message ScalarField {
     oneof data {
       BoolArray bool_data = 1;
       IntArray int_data

    impl.go::Insert)

  4.  InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
Code Block
// internal/proto/internal.proto
message InsertRequest {
  common.MsgBase base = 1;
  string db_name = 2;

...


 

...

 string collection_name = 3;

...


 

...

 string partition_name = 4;

...


 

...

 int64 

...

dbID = 5;

...


 

...

 int64 collectionID = 6;

...


 

...

 int64 partitionID = 7;

...


 

...

milvuspb.InsertRequest 被序列化后通过 gRPC 发送

...

Proxy 组件收到 milvuspb.InsertRequest,为他创建任务 InsertTask,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)

InsertTask 被执行,InsertTask.req 中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest 中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

...

 int64 segmentID = 8;
  string channelID = 9;
  repeated uint64 timestamps = 10;
  repeated int64 rowIDs = 11;
  repeated common.Blob row_data = 12;  // row-based data
}



// internal/proto/internal.proto
message InsertRequest {
 common.MsgBase base = 1;
 string db_name = 2;
 string collection_name = 3;
 string partition_name = 4;
 int64 dbID = 5;
 int64 collectionID = 6;
 int64 partitionID = 7;
 int64 segmentID = 8;
 string channelID = 9;
 repeated uint64 timestamps = 10;
 repeated int64 rowIDs = 11;
 repeated common.Blob row_data = 12;  // row-based data
}

并为每行数据添加 rowIDtimestamp

  1. Proxy 把包含 internalpb.InsertRequestInsertMsg 发送到 pulsar channel 中

  2. Datanode 从 pulsar channel 中收到 InsertMsg ,把数据重新恢复为 列式 存储,保存在内存结构 InsertData 中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    type InsertData struct {
    Data  map[FieldID]FieldData // field id to field data
    Infos []BlobInfo
    }
  3. InsertData 以 parque 的格式被持久化到 Minio (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

...