...
- omit the serialization and deserialization between SDK and proxy
- remove all format conversion between column-based data and row-based data
- use Parquet as binlog file format, and write from arrow data directly
...
Proposal Benefit Analysis (optional)
We will use Arrow to replace all in-memory data format used in Insert/Search/Query.
All proto definitions which are used to communicate between SDK and proxy are listed below:
- Proto changed for Insert
Code Block |
---|
// internal/proto/milvus.proto
message InsertRequest {
...
- repeated schema.FieldData fields_data = 5;
+ bytes record_batch = 5;
...
} |
- Proto changed for Search
Code Block |
---|
// internal/proto/milvus.proto
message Hits {
...
- repeated bytes row_data = 2;
+ bytes record_batch = 2;
...
}
// internal/proto/schema.proto
message SearchResultData {
...
- repeated FieldData fields_data = 3;
+ bytes record_batch = 3;
...
} |
- Proto changed for Query
Code Block |
---|
// internal/proto/milvus.proto
message QueryResults {
...
- repeated schema.FieldData fields_data = 2;
+ bytes record_batch = 2;
} |
Design Details(required)
We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.
Stage 1
- Update InsertRequest in milvus.proto, change Insert to use Arrow format
- Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
- Update QueryResults in milvus.proto, change Query to use Arrow format
Stage 2
- Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
- Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
- Optimize search result flow
Insert Data Flow
...
SDK (python/go/JS) send InsertRequest with fields_data
...
Update all SDK (python/go/JS) to write inserted data into Arrow record, then send InsertRequest with Arrow bytes
...
Proxy receive InsertRequest, if autoID field is empty, create a field with generated IDs, then insert this field into fields_data
...
Proxy receive InsertRequest, decode Arrow record from record_batch bytes, if autoID field is empty, re-create Arrow record filled with generated IDs
...
based on hash_key, save row_data into internal.InsertRequest, encapsulate into InsertMsg, then send to pulsar
...
based on hash_key, save row_data (get via Arrow array.NewSlice) into internal.InsertRequest, encapsulate into InsertMsg, then send to pulsar
...
Search Data Flow
...
segcore get SearchResult, reduce, then fill row_data_
re-organize SearchResult, save to MarshaledHits (C++)
MarshaledHits (C++) is serialized and copied in Go
...
segcore get SearchResult, reduce, then fill row_data_
re-organize SearchResult, save to Arrow format (C++)
Arrow format search result is serialized and copied in (Go)
...
MarshaledHits is decoded and converted to column-based data saved in schema.SearchResultData
...
memory usage, test following 3 scenarios used in Milvus:
raw_data | Array Array buffer | |
---|---|---|
int64 | 80000 | 80000 |
FixedSizeList (float32, dim = 128) | 5120000 | 5160064 |
string (len = 512) | 5120000 | 5160064 |
For Scalar data, Arrow Array uses memory as same as raw data;
for vector data or string, Arrow Array uses few more memory than raw data (about 4 bytes for each row).
举个例子说明如果用 Arrow 会遇到的问题
在 Insert 的时候,Proxy 收到数据后,会把数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode。
按行拆分是基于 2 个原因:
1. 每个 collection 有 2 个或多个物理 channel,通过多个 channel 同时插入可以提升数据插入性能
2. Pulsar 对于每个 InsertMsg 的大小有限制
对于上面的场景,尝试了 4 种解决方案,每种方案都有各自的问题
- 方案1. Proxy 收到插入数据后只创建 1 个 RecordBatch,数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode
- 问题:没法方便地从 Record Batch 中逐条读取数据,Record Batch 有 NewSlice 接口,但是 NewSlice 的返回值无法做除了 Print 之外的任何操作
- 方案2. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 把多个 RecordBatch 合并成一个完整的 RecordBatch
- 问题:多个 RecordBatch 只能逻辑上恢复成一个 ArrowTable,每列数据在物理上是不连续的,这样就无法后续的列存操作
- 方案3. Proxy 收到插入数据后按 Field 创建多个 Array
- 问题:Arrow 不提供 Array 序列化为 []byte 的接口
- 方案4. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 接收到多个 RecordBatch,把每列的数据取出拼接,重新生成一个 RecordBatch
- 问题:违反了 Arrow zero-copy 的设计初衷,与现存方案相比看不到优点何在
总结 Arrow 使用过程中的一些限制:
1. Arrow 数据只能以 RecordBatch 为单位进行序列化和反序列化
2. RecordBatch 不支持以行为单位进行数据拷贝
3. 在 Pulsar 的接收端必须重新创建 RecordBatch
在查询数据流程会遇到同样的问题:
1. segcore 得到的查询结果需要做 2 次 reduce,1 次是 querynode 对多个 segment 的 SearchResult 做归并,另 1 次是 proxy 对多个 querynode 的查询结果做归并,如果查询结果是 RecordBatch 格式,因为无法按行 copy 数据,所以不方便做 reduce 操作
2. querynode 需要把 SearchResult 通过 Pulsar 发送到 Proxy,Proxy 在接收到数据后需要重建 RecordBatch,违反 Arrow zero-copy 的设计初衷
所以我觉得 Arrow 并不适合 Milvus 这种应用场景
Design Details(required)
We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.
Stage 1
Update InsertRequest in milvus.proto, change Insert to use Arrow formatUpdate SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow formatUpdate QueryResults in milvus.proto, change Query to use Arrow format
Stage 2
Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.Optimize search result flow
...
Test Plan(required)
Pass all CI flows
...