Current state: "Under Discussion"
ISSUE: #7210
PRs:
Keywords: arrow
Released: Milvus 2.0
What are we going to do?
From data inspect, Milvus includes 2 data flows mainly: 1). Insert data flow 2). Search data flow
pymilvus
构建一个数据插入请求 milvuspb.InsertRequest
(client/prepare.py::bulk_insert_param)
milvuspb.InsertRequest
的定义如下:
// grpc-proto/milvus.proto
message InsertRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
string partition_name = 4;
repeated schema.FieldData fields_data = 5; // fields' data
repeated uint32 hash_keys = 6;
uint32 num_rows = 7;
}
数据按列被分别插入 fields_data
中,schemapb.FieldData
的定义如下:
// grpc-proto/schema.proto
message ScalarField {
oneof data {
BoolArray bool_data = 1;
IntArray int_data = 2;
LongArray long_data = 3;
FloatArray float_data = 4;
DoubleArray double_data = 5;
StringArray string_data = 6;
BytesArray bytes_data = 7;
}
}
message VectorField {
int64 dim = 1;
oneof data {
FloatArray float_vector = 2;
bytes binary_vector = 3;
}
}
message FieldData {
DataType type = 1;
string field_name = 2;
oneof field {
ScalarField scalars = 3;
VectorField vectors = 4;
}
int64 field_id = 5;
}
milvuspb.InsertRequest
被序列化后通过 gRPC 发送
Proxy
组件收到 milvuspb.InsertRequest
,为他创建任务 InsertTask
,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)
InsertTask
被执行,InsertTask.req
中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest
中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
internalpb.InsertRequest
的定义如下:
// internal/proto/internal.proto
message InsertRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
string partition_name = 4;
int64 dbID = 5;
int64 collectionID = 6;
int64 partitionID = 7;
int64 segmentID = 8;
string channelID = 9;
repeated uint64 timestamps = 10;
repeated int64 rowIDs = 11;
repeated common.Blob row_data = 12; // row-based data
}
并为每行数据添加 rowID
和 timestamp
Proxy
把包含 internalpb.InsertRequest
的 InsertMsg
发送到 pulsar channel 中
Datanode
从 pulsar channel 中收到 InsertMsg
,把数据重新恢复为 列式 存储,保存在内存结构 InsertData
中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)
type InsertData struct {
Data map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}
InsertData
在收到 LoadSegments
请求后,会把 segment
对应的所有 binlog
文件从 minio
加载到内存,并放入内存数据结构 Blob
中 (internal/querynode/segment_loader.go::loadSegmentFieldsData)
type Blob struct {
Key string // binlog file path
Value []byte // binlog file data
}
通常 querynode
只加载标量数据,不加载向量数据,除非该向量列未建 index
。
Blob
里的数据经过反序列化后,提取出原始数据,存入 InsertData
数据结构中
querynode
执行 search
请求,通过 CGO 调用 knowhere
搜索引擎,得到 SearchResult
(internal/query_node/query_collection.go::search)
// internal/core/src/common/Types.h
struct SearchResult {
...
public:
int64_t num_queries_;
int64_t topk_;
std::vector<float> result_distances_;
public:
void* segment_;
std::vector<int64_t> internal_seg_offsets_;
std::vector<int64_t> result_offsets_;
std::vector<std::vector<char>> row_data_;
};
knowhere
返回的 SearchResult
中只有 result_distances_
和 internal_seg_offsets_
被填入了数据。
querynode
在得到所有 segment
返回的 SearchResult
后,对结果做归并,并通过 internal_seg_offsets_
得到其它输出列数据,并按 行存 格式写入 row_data_
中 (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)
querynode
对 SearchResult
数据再次整理,存入数据结构 milvus.Hits
中
// internal/proto/milvus.proto
message Hits {
repeated int64 IDs = 1;
repeated bytes row_data = 2;
repeated float scores = 3;
}
milvus.Hits
中的数据通过函数 translateHits
转为 列存 数据 schemapb.SearchResultData
(internal/query_node/query_collection.go::translateHits)
// internal/proto/schema.proto
message SearchResultData {
int64 num_queries = 1;
int64 top_k = 2;
repeated FieldData fields_data = 3;
repeated float scores = 4;
IDs ids = 5;
repeated int64 topks = 6;
}
schemapb.SearchResultData
被序列化后,封装为 internalpb.SearchResults
,并放入 msgstream.SearchResultMsg
,通过 pulsar channel 发送 (internal/query_node/query_collection.go::search)
// internal/proto/internal.proto
message SearchResults {
common.MsgBase base = 1;
common.Status status = 2;
string result_channelID = 3;
string metric_type = 4;
repeated bytes hits = 5; // search result data
// schema.SearchResultsData inside
bytes sliced_blob = 9;
int64 sliced_num_count = 10;
int64 sliced_offset = 11;
repeated int64 sealed_segmentIDs_searched = 6;
repeated string channelIDs_searched = 7;
repeated int64 global_sealed_segmentIDs = 8;
}
proxy
从 pulsar channel 中收集到所有 querynode
发送过来的 msgstream.SearchResultMsg
,反序列化得到 schemapb.SearchResultData
,再做一次归并,数据放入 milvuspb.SearchResults
,通过 gRPC 传回 SDK (internal/proxy/task.go::SearchTask::PostExecute)
// internal/proto/milvus.proto
message SearchResults {
common.Status status = 1;
schema.SearchResultData results = 2;
}
SDK 收到 milvuspb.SearchResults
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed.
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
## Compatibility, Deprecation, and Migration Plan(optional)
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Describe in few sentences how the MEP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
Briefly list all references