...
pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)
Code Block // grpc-proto/milvus.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; repeated schema.FieldData fields_data = 5; // fields' data repeated uint32 hash_keys = 6; uint32 num_rows = 7; }
Data is inserted into fields_data by column, schemapb.FieldData is defined as following:
Code Block // grpc-proto/schema.proto message ScalarField { oneof data { BoolArray bool_data = 1; IntArray int_data = 2; LongArray long_data = 3; FloatArray float_data = 4; DoubleArray double_data = 5; StringArray string_data = 6; BytesArray bytes_data = 7; } } message VectorField { int64 dim = 1; oneof data { FloatArray float_vector = 2; bytes binary_vector = 3; } } message FieldData { DataType type = 1; string field_name = 2; oneof field { ScalarField scalars = 3; VectorField vectors = 4; } int64 field_id = 5; }
milvuspb.InsertRequest is serialized and send via gRPC
Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/
grpc-proto/schema.proto
message ScalarField {
oneof data {
BoolArray bool_data = 1;
IntArray int_dataimpl.go::Insert)
- InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
Code Block |
---|
// internal/proto/internal.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; |
...
...
string collection_name = 3; |
...
...
string partition_name = 4; |
...
...
int64 |
...
dbID = 5; |
...
...
int64 collectionID = 6; |
...
...
int64 partitionID = 7; |
...
...
milvuspb.InsertRequest
被序列化后通过 gRPC 发送
...
Proxy
组件收到 milvuspb.InsertRequest
,为他创建任务 InsertTask
,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)
InsertTask
被执行,InsertTask.req
中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest
中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
...
int64 segmentID = 8;
string channelID = 9;
repeated uint64 timestamps = 10;
repeated int64 rowIDs = 11;
repeated common.Blob row_data = 12; // row-based data
} |
// internal/proto/internal.proto
message InsertRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
string partition_name = 4;
int64 dbID = 5;
int64 collectionID = 6;
int64 partitionID = 7;
int64 segmentID = 8;
string channelID = 9;
repeated uint64 timestamps = 10;
repeated int64 rowIDs = 11;
repeated common.Blob row_data = 12; // row-based data
}
并为每行数据添加 rowID
和 timestamp
Proxy
把包含internalpb.InsertRequest
的InsertMsg
发送到 pulsar channel 中Datanode
从 pulsar channel 中收到InsertMsg
,把数据重新恢复为 列式 存储,保存在内存结构InsertData
中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)type InsertData struct {
Data map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}InsertData
...