Current state: "Under Discussion"
ISSUE: #7210
PRs:
Keywords: arrow/column-based/row-based
Released: Milvus 2.0
Summary
Milvus 2.0 is a cloud-native and multi-language vector database, we use gRPC and pulsar to communicate among SDK and components.
In consideration of the data size, especially when inserting and search result returning, Milvus takes a lot of CPU cycles to do serialization and deserialization.
In this enhancement proposal, we suggest to adopt Apache Arrow as Milvus in-memory data format. Since in the field of big data, Apache Arrow has been a
factor standard for in-memory analytics. It specifies a standardized language-independent columnar memory format.
Motivation(required)
From a data perspective, Milvus mainly includes 2 data flows:
- Insert data flow
- Search data flow
Insert Data Flow
pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)
// grpc-proto/milvus.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; repeated schema.FieldData fields_data = 5; // fields' data repeated uint32 hash_keys = 6; uint32 num_rows = 7; }
Data is inserted into fields_data by column, schemapb.FieldData is defined as following:
// grpc-proto/schema.proto message ScalarField { oneof data { BoolArray bool_data = 1; IntArray int_data = 2; LongArray long_data = 3; FloatArray float_data = 4; DoubleArray double_data = 5; StringArray string_data = 6; BytesArray bytes_data = 7; } } message VectorField { int64 dim = 1; oneof data { FloatArray float_vector = 2; bytes binary_vector = 3; } } message FieldData { DataType type = 1; string field_name = 2; oneof field { ScalarField scalars = 3; VectorField vectors = 4; } int64 field_id = 5; }
milvuspb.InsertRequest is serialized and send via gRPC
Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/impl.go::Insert)
InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)
// internal/proto/internal.proto message InsertRequest { common.MsgBase base = 1; string db_name = 2; string collection_name = 3; string partition_name = 4; int64 dbID = 5; int64 collectionID = 6; int64 partitionID = 7; int64 segmentID = 8; string channelID = 9; repeated uint64 timestamps = 10; repeated int64 rowIDs = 11; repeated common.Blob row_data = 12; // row-based data }
rowID and timestamp are added for each row data
Datanode receives InsertMsg from pulsar channel, restore data to column-based into structure InsertData (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)
type InsertData struct { Data map[FieldID]FieldData // field id to field data Infos []BlobInfo }
Search Data Flow
querynode reads segment's binlog files from Minio, and saves them into structure Blob (internal/querynode/segment_loader.go::loadSegmentFieldsData)
type Blob struct { Key string // binlog file path Value []byte // binlog file data }
The data in Blob is deserialized, raw-data in it is saved into structure InsertData
querynode invokes search engine to get SearchResult (internal/query_node/query_collection.go::search)
// internal/core/src/common/Types.h struct SearchResult { ... public: int64_t num_queries_; int64_t topk_; std::vector<float> result_distances_; public: void* segment_; std::vector<int64_t> internal_seg_offsets_; std::vector<int64_t> result_offsets_; std::vector<std::vector<char>> row_data_; };
At this time, only "result_distances_" and "internal_seg_offsets_" of "SearchResult" are filled into data.querynode reduces all SearchResult returned by segment, fetches all other fields' data, and saves them into "row_data_" with row-based format. (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)
querynode organizes SearchResult again, and save them into structure milvus.Hits
// internal/proto/milvus.proto message Hits { repeated int64 IDs = 1; repeated bytes row_data = 2; repeated float scores = 3; }
Row-based data saved in milvus.Hits is converted to column-based data, and saved into schemapb.SearchResultData (internal/query_node/query_collection.go::translateHits)
// internal/proto/schema.proto message SearchResultData { int64 num_queries = 1; int64 top_k = 2; repeated FieldData fields_data = 3; repeated float scores = 4; IDs ids = 5; repeated int64 topks = 6; }
schemapb.SearchResultData is serialized, encapsulated as internalpb.SearchResults, saved into SearchResultMsg, and send into pulsar channel (internal/query_node/query_collection.go::search)
// internal/proto/internal.proto message SearchResults { common.MsgBase base = 1; common.Status status = 2; string result_channelID = 3; string metric_type = 4; repeated bytes hits = 5; // search result data // schema.SearchResultsData inside bytes sliced_blob = 9; int64 sliced_num_count = 10; int64 sliced_offset = 11; repeated int64 sealed_segmentIDs_searched = 6; repeated string channelIDs_searched = 7; repeated int64 global_sealed_segmentIDs = 8; }
Proxy collects all SearchResultMsg from querynodes, gets schemapb.SearchResultData by deserialization, then gets milvuspb.SearchResults by reducing, finally send back to SDK visa gRPC. (internal/proxy/task.go::SearchTask::PostExecute)
// internal/proto/milvus.proto message SearchResults { common.Status status = 1; schema.SearchResultData results = 2; }
- SDK receives milvuspb.SearchResult
In above 2 data flows, we can see frequent format conversion between column-based data and row-based data (marked as RED dashed line).
If we use Arrow as all in-memory data format, we can:
- omit the serialization and deserialization between SDK and proxy
- remove all format conversion between column-based data and row-based data
- use Parquet as binlog file format, and write from arrow data directly
Proposal Benefit Analysis (optional)
Arrow memory usage, test following 3 scenarios used in Milvus:
raw_data | Array Array buffer | |
---|---|---|
int64 | 80000 | 80000 |
FixedSizeList (float32, dim = 128) | 5120000 | 5160064 |
string (len = 512) | 5120000 | 5160064 |
For Scalar data, Arrow Array uses memory as same as raw data;
for vector data or string, Arrow Array uses few more memory than raw data (about 4 bytes for each row).
举个例子说明如果用 Arrow 会遇到的问题
在 Insert 的时候,Proxy 收到数据后,会把数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode。
按行拆分是基于 2 个原因:
1. 每个 collection 有 2 个或多个物理 channel,通过多个 channel 同时插入可以提升数据插入性能
2. Pulsar 对于每个 InsertMsg 的大小有限制
对于上面的场景,尝试了 4 种解决方案,每种方案都有各自的问题
- 方案1. Proxy 收到插入数据后只创建 1 个 RecordBatch,数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode
- 问题:没法方便地从 Record Batch 中逐条读取数据,Record Batch 有 NewSlice 接口,但是 NewSlice 的返回值无法做除了 Print 之外的任何操作
- 方案2. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 把多个 RecordBatch 合并成一个完整的 RecordBatch
- 问题:多个 RecordBatch 只能逻辑上恢复成一个 ArrowTable,每列数据在物理上是不连续的,这样就无法后续的列存操作
- 方案3. Proxy 收到插入数据后按 Field 创建多个 Array
- 问题:Arrow 不提供 Array 序列化为 []byte 的接口
- 方案4. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 接收到多个 RecordBatch,把每列的数据取出拼接,重新生成一个 RecordBatch
- 问题:违反了 Arrow zero-copy 的设计初衷,与现存方案相比看不到优点何在
总结 Arrow 使用过程中的一些限制:
1. Arrow 数据只能以 RecordBatch 为单位进行序列化和反序列化
2. RecordBatch 不支持以行为单位进行数据拷贝
3. 在 Pulsar 的接收端必须重新创建 RecordBatch
在查询数据流程会遇到同样的问题:
1. segcore 得到的查询结果需要做 2 次 reduce,1 次是 querynode 对多个 segment 的 SearchResult 做归并,另 1 次是 proxy 对多个 querynode 的查询结果做归并,如果查询结果是 RecordBatch 格式,因为无法按行 copy 数据,所以不方便做 reduce 操作
2. querynode 需要把 SearchResult 通过 Pulsar 发送到 Proxy,Proxy 在接收到数据后需要重建 RecordBatch,违反 Arrow zero-copy 的设计初衷
所以我觉得 Arrow 并不适合 Milvus 这种应用场景
Design Details(required)
We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.
Stage 1
Update InsertRequest in milvus.proto, change Insert to use Arrow formatUpdate SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow formatUpdate QueryResults in milvus.proto, change Query to use Arrow format
Stage 2
Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.Optimize search result flow
Test Plan(required)
Pass all CI flows
References(optional)
https://arrow.apache.org/docs/
Arrow Test Code (Go)
import ( "bytes" "fmt" "testing" "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" "github.com/apache/arrow/go/arrow/ipc" "github.com/apache/arrow/go/arrow/memory" ) const ( _DIM = 4 ) var pool = memory.NewGoAllocator() func CreateArrowSchema() *arrow.Schema { fieldVector := arrow.Field{ Name: "field_vector", Type: arrow.FixedSizeListOf(_DIM, arrow.PrimitiveTypes.Float32), } fieldVal := arrow.Field{ Name: "field_val", Type: arrow.PrimitiveTypes.Int64, } schema := arrow.NewSchema([]arrow.Field{fieldVector, fieldVal}, nil) return schema } func CreateArrowRecord(schema *arrow.Schema, iValues []int64, vValues []float32) array.Record { rb := array.NewRecordBuilder(pool, schema) defer rb.Release() rb.Reserve(len(iValues)) rowNum := len(iValues) for i, field := range rb.Schema().Fields() { switch field.Type.ID() { case arrow.INT64: vb := rb.Field(i).(*array.Int64Builder) vb.AppendValues(iValues, nil) case arrow.FIXED_SIZE_LIST: lb := rb.Field(i).(*array.FixedSizeListBuilder) valid := make([]bool, rowNum) for i := 0; i < rowNum; i++ { valid[i] = true } lb.AppendValues(valid) vb := lb.ValueBuilder().(*array.Float32Builder) vb.AppendValues(vValues, nil) } } rec := rb.NewRecord() return rec } func WriteArrowRecord(schema *arrow.Schema, rec array.Record) []byte { defer rec.Release() blob := make([]byte, 0) buf := bytes.NewBuffer(blob) // internal/arrdata/ioutil.go writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool)) defer writer.Close() //ShowArrowRecord(rec) if err := writer.Write(rec); err != nil { panic("could not write record: %v" + err.Error()) } err := writer.Close() if err != nil { panic(err.Error()) } return buf.Bytes() } func ReadArrowRecords(schema *arrow.Schema, blobs [][]byte) array.Record { iValues := make([]int64, 0) vValues := make([]float32, 0) for _, blob := range blobs { buf := bytes.NewReader(blob) reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool)) if err != nil { panic("create reader fail: %v" + err.Error()) } defer reader.Release() rec, err := reader.Read() if err != nil { panic("read record fail: %v" + err.Error()) } defer rec.Release() for _, col := range rec.Columns() { switch col.DataType().ID() { case arrow.INT64: arr := col.(*array.Int64) iValues = append(iValues, arr.Int64Values()...) case arrow.FIXED_SIZE_LIST: arr := col.(*array.FixedSizeList).ListValues().(*array.Float32) vValues = append(vValues, arr.Float32Values()...) } } } ret := CreateArrowRecord(schema, iValues, vValues) ShowArrowRecord(ret) return ret } func ReadArrowRecordsToTable(schema *arrow.Schema, blobs [][]byte) array.Table { recs := make([]array.Record, 0) for _, blob := range blobs { buf := bytes.NewReader(blob) reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool)) if err != nil { panic("create reader fail: %v" + err.Error()) } defer reader.Release() rec, err := reader.Read() if err != nil { panic("read record fail: %v" + err.Error()) } defer rec.Release() recs = append(recs, rec) } table := array.NewTableFromRecords(schema, recs) ShowArrowTable(table) return table } func ShowArrowRecord(rec array.Record) { fmt.Printf("\n=============================\n") fmt.Printf("Schema: %v\n", rec.Schema()) fmt.Printf("NumCols: %v\n", rec.NumCols()) fmt.Printf("NumRows: %v\n", rec.NumRows()) //rowNum := int(rec.NumRows()) for i, col := range rec.Columns() { fmt.Printf("Column[%d] %q: %v\n", i, rec.ColumnName(i), col) } } func ShowArrowTable(tbl array.Table) { fmt.Printf("\n=============================\n") fmt.Printf("Schema: %v\n", tbl.Schema()) fmt.Printf("NumCols: %v\n", tbl.NumCols()) fmt.Printf("NumRows: %v\n", tbl.NumRows()) for i := 0; i < int(tbl.NumCols()); i++ { col := tbl.Column(i) fmt.Printf("Column[%d] %s: %v\n", i, tbl.Schema().Field(i).Name, col.Data().Chunks()) } } func TestArrowIPC(t *testing.T) { schema := CreateArrowSchema() rec0 := CreateArrowRecord(schema, []int64{0}, []float32{0,0,0,0}) rec1 := CreateArrowRecord(schema, []int64{1,2,3}, []float32{1,1,1,1,2,2,2,2,3,3,3,3}) blob0 := WriteArrowRecord(schema, rec0) blob1 := WriteArrowRecord(schema, rec1) ReadArrowRecords(schema, [][]byte{blob0, blob1}) ReadArrowRecordsToTable(schema, [][]byte{blob0, blob1}) }