...
pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)
Code Block language text // grpc-proto/milvus.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; repeated schema.FieldData fields_data = 5; // fields' data repeated uint32 hash_keys = 6; uint32 num_rows = 7; }
Data is inserted into fields_data by column, schemapb.FieldData is defined as following:
Code Block // grpc-proto/schema.proto message ScalarField { oneof data { BoolArray bool_data = 1; IntArray int_data = 2; LongArray long_data = 3; FloatArray float_data = 4; DoubleArray double_data = 5; StringArray string_data = 6; BytesArray bytes_data = 7; } } message VectorField { int64 dim = 1; oneof data { FloatArray float_vector = 2; bytes binary_vector = 3; } } message FieldData { DataType type = 1; string field_name = 2; oneof field { ScalarField scalars = 3; VectorField vectors = 4; } int64 field_id = 5; }
// grpc-proto/schema.proto
message ScalarField {
oneof data {
BoolArray bool_data = 1;
IntArray int_data = 2;
LongArray long_data = 3;
FloatArray float_data = 4;
DoubleArray double_data = 5;
StringArray string_data = 6;
BytesArray bytes_data = 7;
}
}
message VectorField {
int64 dim = 1;
oneof data {
FloatArray float_vector = 2;
bytes binary_vector = 3;
}
}
message FieldData {
DataType type = 1;
string field_name = 2;
oneof field {
ScalarField scalars = 3;
VectorField vectors = 4;
}
int64 field_id = 5;
}milvuspb.InsertRequest
被序列化后通过 gRPC 发送Proxy
组件收到milvuspb.InsertRequest
,为他创建任务InsertTask
,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)InsertTask
被执行,InsertTask.req
中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求internalpb.InsertRequest
中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)internalpb.InsertRequest
的定义如下:// internal/proto/internal.proto
message InsertRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
string partition_name = 4;
int64 dbID = 5;
int64 collectionID = 6;
int64 partitionID = 7;
int64 segmentID = 8;
string channelID = 9;
repeated uint64 timestamps = 10;
repeated int64 rowIDs = 11;
repeated common.Blob row_data = 12; // row-based data
}并为每行数据添加
rowID
和timestamp
Proxy
把包含internalpb.InsertRequest
的InsertMsg
发送到 pulsar channel 中Datanode
从 pulsar channel 中收到InsertMsg
,把数据重新恢复为 列式 存储,保存在内存结构InsertData
中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)type InsertData struct {
Data map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}InsertData
...