Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • omit the serialization and deserialization between SDK and proxy
  • remove all format conversion between column-based data and row-based data
  • use Parquet as binlog file format, and write from arrow data directly

...

Proposal Benefit Analysis (optional)

We will use Arrow to replace all in-memory data format used in Insert/Search/Query. 

All proto definitions which are used to communicate between SDK and proxy are listed below:

  • Proto changed for Insert
Code Block
// internal/proto/milvus.proto
message InsertRequest {
  ...
- repeated schema.FieldData fields_data = 5;
+ bytes record_batch = 5;
  ...
}
  • Proto changed for Search
Code Block
// internal/proto/milvus.proto
message Hits {
  ...
- repeated bytes row_data = 2;
+ bytes record_batch = 2;
  ...
}

// internal/proto/schema.proto
message SearchResultData {
  ...
- repeated FieldData fields_data = 3;
+ bytes record_batch = 3;
  ...
}
  • Proto changed for Query
Code Block
// internal/proto/milvus.proto
message QueryResults {
  ...
- repeated schema.FieldData fields_data = 2;
+ bytes record_batch = 2;
}

Design Details(required)

We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.

Stage 1

  1. Update InsertRequest in milvus.proto, change Insert to use Arrow format
  2. Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
  3. Update QueryResults in milvus.proto, change Query to use Arrow format

Stage 2

  1. Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
  2. Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
  3. Optimize search result flow

Insert Data Flow

...

SDK (python/go/JS) send InsertRequest with fields_data

...

Update all SDK (python/go/JS) to write inserted data into Arrow record, then send InsertRequest with Arrow bytes

...

Proxy receive InsertRequest, if autoID field is empty, create a field with generated IDs, then insert this field into fields_data

...

Proxy receive InsertRequest, decode Arrow record from record_batch bytes, if autoID field is empty, re-create Arrow record filled with generated IDs

...

based on hash_key, save row_data into internal.InsertRequest, encapsulate into InsertMsg, then send to pulsar

...

based on hash_key, save row_data (get via Arrow array.NewSlice) into internal.InsertRequest, encapsulate into InsertMsg, then send to pulsar

...

Search Data Flow

...

segcore get SearchResult, reduce, then fill row_data_

re-organize SearchResult, save to MarshaledHits (C++)

MarshaledHits (C++) is serialized and copied in Go

...

segcore get SearchResult, reduce, then fill row_data_

re-organize SearchResult, save to Arrow format (C++)

Arrow format search result is serialized and copied in (Go)

...

MarshaledHits is decoded and converted to column-based data saved in schema.SearchResultData

...

memory usage, test following 3 scenarios used in Milvus:


raw_dataArray Array buffer
int648000080000
FixedSizeList (float32, dim = 128)51200005160064
string (len = 512)51200005160064

For Scalar data, Arrow Array uses memory as same as raw data;

for vector data or string, Arrow Array uses few more memory than raw data (about 4 bytes for each row).


举个例子说明如果用 Arrow 会遇到的问题

在 Insert 的时候,Proxy 收到数据后,会把数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode。

按行拆分是基于 2 个原因:

1. 每个 collection 有 2 个或多个物理 channel,通过多个 channel 同时插入可以提升数据插入性能

2. Pulsar 对于每个 InsertMsg 的大小有限制

对于上面的场景,尝试了 4 种解决方案,每种方案都有各自的问题

- 方案1. Proxy 收到插入数据后只创建 1 个 RecordBatch,数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode

- 问题:没法方便地从 Record Batch 中逐条读取数据,Record Batch 有 NewSlice 接口,但是 NewSlice 的返回值无法做除了 Print 之外的任何操作

- 方案2. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 把多个 RecordBatch 合并成一个完整的 RecordBatch

- 问题:多个 RecordBatch 只能逻辑上恢复成一个 ArrowTable,每列数据在物理上是不连续的,这样就无法后续的列存操作

- 方案3. Proxy 收到插入数据后按 Field 创建多个 Array

- 问题:Arrow 不提供 Array 序列化为 []byte 的接口

- 方案4. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 接收到多个 RecordBatch,把每列的数据取出拼接,重新生成一个 RecordBatch

- 问题:违反了 Arrow zero-copy 的设计初衷,与现存方案相比看不到优点何在

总结 Arrow 使用过程中的一些限制:

1. Arrow 数据只能以 RecordBatch 为单位进行序列化和反序列化

2. RecordBatch 不支持以行为单位进行数据拷贝

3. 在 Pulsar 的接收端必须重新创建 RecordBatch

在查询数据流程会遇到同样的问题:

1. segcore 得到的查询结果需要做 2 次 reduce,1 次是 querynode 对多个 segment 的 SearchResult 做归并,另 1 次是 proxy 对多个 querynode 的查询结果做归并,如果查询结果是 RecordBatch 格式,因为无法按行 copy 数据,所以不方便做 reduce 操作

2. querynode 需要把 SearchResult 通过 Pulsar 发送到 Proxy,Proxy 在接收到数据后需要重建 RecordBatch,违反 Arrow zero-copy 的设计初衷

所以我觉得 Arrow 并不适合 Milvus 这种应用场景



Design Details(required)

We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.

Stage 1

  1. Update InsertRequest in milvus.proto, change Insert to use Arrow format
  2. Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
  3. Update QueryResults in milvus.proto, change Query to use Arrow format

Stage 2

  1. Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
  2. Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
  3. Optimize search result flow

...

Test Plan(required)

Pass all CI flows

...