...
pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)
Code Block // grpc-proto/milvus.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; repeated schema.FieldData fields_data = 5; // fields' data repeated uint32 hash_keys = 6; uint32 num_rows = 7; }
Data is inserted into fields_data by column, schemapb.FieldData is defined as following:
Code Block // grpc-proto/schema.proto message ScalarField { oneof data { BoolArray bool_data = 1; IntArray int_data = 2; LongArray long_data = 3; FloatArray float_data = 4; DoubleArray double_data = 5; StringArray string_data = 6; BytesArray bytes_data = 7; } } message VectorField { int64 dim = 1; oneof data { FloatArray float_vector = 2; bytes binary_vector = 3; } } message FieldData { DataType type = 1; string field_name = 2; oneof field { ScalarField scalars = 3; VectorField vectors = 4; } int64 field_id = 5; }
milvuspb.InsertRequest is serialized and send via gRPC
Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/impl.go::Insert)
InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
Code Block // internal/proto/internal.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; int64 dbID = 5; int64 collectionID = 6; int64 partitionID = 7; int64 segmentID = 8; string channelID = 9; repeated uint64 timestamps = 10; repeated int64 rowIDs = 11; repeated common.Blob row_data = 12; // row-based data }
// internal/proto/internal.proto
message InsertRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
string partition_name = 4;
int64 dbID = 5;
int64 collectionID = 6;
int64 partitionID = 7;
int64 segmentID = 8;
string channelID = 9;
repeated uint64 timestamps = 10;
repeated int64 rowIDs = 11;
repeated common.Blob row_data = 12; // row-based data
}
并为每行数据添加 rowID
和 timestamp
rowID and timestamp are added for each row data
Datanode receives InsertMsg from pulsar channel, restore data to column-based into structure InsertData
Proxy
把包含internalpb.InsertRequest
的InsertMsg
发送到 pulsar channel 中Datanode
从 pulsar channel 中收到InsertMsg
,把数据重新恢复为 列式 存储,保存在内存结构InsertData
中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)Code Block type InsertData struct {
mapData
map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}
InsertData
以 parque 的格式被持久化到 Minio Search Data Flow
在收到
LoadSegments
请求后,会把segment
对应的所有binlog
文件从minio
加载到内存,并放入内存数据结构Blob
中 (internal/querynode/segment_loader.go::loadSegmentFieldsData)type Blob struct {
Key string // binlog file path
Value []byte // binlog file data
}通常
querynode
只加载标量数据,不加载向量数据,除非该向量列未建index
。Blob
里的数据经过反序列化后,提取出原始数据,存入InsertData
数据结构中querynode
执行search
请求,通过 CGO 调用knowhere
搜索引擎,得到SearchResult
(internal/query_node/query_collection.go::search)// internal/core/src/common/Types.h
struct SearchResult {
...
public:
int64_t num_queries_;
int64_t topk_;
std::vector<float> result_distances_;
public:
void* segment_;
std::vector<int64_t> internal_seg_offsets_;
std::vector<int64_t> result_offsets_;
std::vector<std::vector<char>> row_data_;
};knowhere
返回的SearchResult
中只有result_distances_
和internal_seg_offsets_
被填入了数据。querynode
在得到所有segment
返回的SearchResult
后,对结果做归并,并通过internal_seg_offsets_
得到其它输出列数据,并按 行存 格式写入row_data_
中 (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)querynode
对SearchResult
数据再次整理,存入数据结构milvus.Hits
中// internal/proto/milvus.proto
message Hits {
repeated int64 IDs = 1;
repeated bytes row_data = 2;
repeated float scores = 3;
}milvus.Hits
中的数据通过函数translateHits
转为 列存 数据schemapb.SearchResultData
(internal/query_node/query_collection.go::translateHits)// internal/proto/schema.proto
message SearchResultData {
int64 num_queries = 1;
int64 top_k = 2;
repeated FieldData fields_data = 3;
repeated float scores = 4;
IDs ids = 5;
repeated int64 topks = 6;
}schemapb.SearchResultData
被序列化后,封装为internalpb.SearchResults
,并放入msgstream.SearchResultMsg
,通过 pulsar channel 发送 (internal/query_node/query_collection.go::search)// internal/proto/internal.proto
message SearchResults {
common.MsgBase base = 1;
common.Status status = 2;
string result_channelID = 3;
string metric_type = 4;
repeated bytes hits = 5; // search result data
// schema.SearchResultsData inside
bytes sliced_blob = 9;
int64 sliced_num_count = 10;
int64 sliced_offset = 11;
repeated int64 sealed_segmentIDs_searched = 6;
repeated string channelIDs_searched = 7;
repeated int64 global_sealed_segmentIDs = 8;
}proxy
从 pulsar channel 中收集到所有querynode
发送过来的msgstream.SearchResultMsg
,反序列化得到schemapb.SearchResultData
,再做一次归并,数据放入milvuspb.SearchResults
,通过 gRPC 传回 SDK (internal/proxy/task.go::SearchTask::PostExecute)// internal/proto/milvus.proto
message SearchResults {
common.Status status = 1;
schema.SearchResultData results = 2;
}SDK 收到
milvuspb.SearchResults
...