You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 42 Next »

Current state: "Under Discussion"

ISSUE: #7210

PRs: 

Keywords: arrow/column-based/row-based

Released: Milvus 2.0

Summary

Milvus 2.0 is a cloud-native and multi-language vector database, we use gRPC and pulsar to communicate among SDK and components.

In consideration of the data size, especially when inserting and search result returning, Milvus takes a lot of CPU cycles to do serialization and deserialization.

In this enhancement proposal, we suggest to adopt Apache Arrow as Milvus in-memory data format. Since in the field of big data, Apache Arrow has been a

factor standard for in-memory analytics. It specifies a standardized language-independent columnar memory format.

Motivation(required)

From a data perspective, Milvus mainly includes 2 data flows:

  1. Insert data flow
  2. Search data flow

Insert Data Flow

BLUE - Column-based data structure

ORANGE - Row-based data structure

RED DASHED LINE - Data format conversion


Insert data flow includes following steps:

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

    // grpc-proto/milvus.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      repeated schema.FieldData fields_data = 5;	// fields' data
      repeated uint32 hash_keys = 6;
      uint32 num_rows = 7;
    }

    Data is inserted into fields_data by column, schemapb.FieldData is defined as following:

    // grpc-proto/schema.proto
    message ScalarField {
      oneof data {
        BoolArray bool_data = 1;
        IntArray int_data = 2;
        LongArray long_data = 3;
        FloatArray float_data = 4;
        DoubleArray double_data = 5;
        StringArray string_data = 6;
        BytesArray bytes_data = 7;
      }
    }
    ​
    message VectorField {
      int64 dim = 1;
      oneof data {
        FloatArray float_vector = 2;
        bytes binary_vector = 3;
      }
    }
    ​
    message FieldData {
      DataType type = 1;
      string field_name = 2;
      oneof field {
        ScalarField scalars = 3;
        VectorField vectors = 4;
      }
      int64 field_id = 5;
    }
  2.  milvuspb.InsertRequest is serialized and send via gRPC

  3.  Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/impl.go::Insert)

  4.  InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

    // internal/proto/internal.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      int64 dbID = 5;
      int64 collectionID = 6;
      int64 partitionID = 7;
      int64 segmentID = 8;
      string channelID = 9;
      repeated uint64 timestamps = 10;
      repeated int64 rowIDs = 11;
      repeated common.Blob row_data = 12;  // row-based data
    }

     rowID and timestamp are added for each row data

  5.  Proxy encapsulates internalpb.InsertRequest into InsertMsg, and send it to pulsar channel

  6.  Datanode receives InsertMsg from pulsar channel, restore data to column-based into structure InsertData (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    type InsertData struct {
        Data  map[FieldID]FieldData // field id to field data
        Infos []BlobInfo
    }
  7.  InsertData is written into Minio with parquet format (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

Search Data Flow

BLUE - Column-based data structure

ORANGE - Row-based data structure

RED DASHED LINE - Data format conversion 


Search data flow includes following steps:

  1. querynode reads segment's binlog files from Minio, and saves them into structure Blob (internal/querynode/segment_loader.go::loadSegmentFieldsData)

    type Blob struct {
        Key   string    // binlog file path
        Value []byte    // binlog file data
    }

    The data in Blob is deserialized, raw-data in it is saved into structure InsertData

  2.  querynode invokes search engine to get SearchResult (internal/query_node/query_collection.go::search)

    // internal/core/src/common/Types.h
    struct SearchResult {
     ...
     public:
        int64_t num_queries_;
        int64_t topk_;
        std::vector<float> result_distances_;
    ​
     public:
        void* segment_;
        std::vector<int64_t> internal_seg_offsets_;
        std::vector<int64_t> result_offsets_;
        std::vector<std::vector<char>> row_data_;
    };


    At this time, only "result_distances_" and "internal_seg_offsets_" of "SearchResult" are filled into data.

    querynode reduces all SearchResult returned by segment, fetches all other fields' data, and saves them into "row_data_" with row-based format. (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)

  3. querynode organizes SearchResult again, and save them into structure milvus.Hits

    // internal/proto/milvus.proto
    message Hits {
      repeated int64 IDs = 1;
      repeated bytes row_data = 2;
      repeated float scores = 3;
    }
  4.  Row-based data saved in milvus.Hits is converted to column-based data, and saved into schemapb.SearchResultData (internal/query_node/query_collection.go::translateHits)

    // internal/proto/schema.proto
    message SearchResultData {
      int64 num_queries = 1;
      int64 top_k = 2;
      repeated FieldData fields_data = 3;
      repeated float scores = 4;
      IDs ids = 5;
      repeated int64 topks = 6;
    }
  5.  schemapb.SearchResultData is serialized, encapsulated as internalpb.SearchResults, saved into SearchResultMsg, and send into pulsar channel (internal/query_node/query_collection.go::search)

    // internal/proto/internal.proto
    message SearchResults {
      common.MsgBase base = 1;
      common.Status status = 2;
      string result_channelID = 3;
      string metric_type = 4;
      repeated bytes hits = 5;  // search result data
    ​
      // schema.SearchResultsData inside
      bytes sliced_blob = 9;
      int64 sliced_num_count = 10;
      int64 sliced_offset = 11;
    ​
      repeated int64 sealed_segmentIDs_searched = 6;
      repeated string channelIDs_searched = 7;
      repeated int64 global_sealed_segmentIDs = 8;
    }
  6.  Proxy collects all SearchResultMsg from querynodes, gets schemapb.SearchResultData by deserialization, then gets milvuspb.SearchResults by reducing, finally send back to SDK visa gRPC. (internal/proxy/task.go::SearchTask::PostExecute)

    // internal/proto/milvus.proto
    message SearchResults {
      common.Status status = 1;
      schema.SearchResultData results = 2;
    }
  7.  SDK receives milvuspb.SearchResult


In above 2 data flows, we can see frequent format conversion between column-based data and row-based data (marked as RED dashed line).

If we use Arrow as all in-memory data format, we can:

  • omit the serialization and deserialization between SDK and proxy
  • remove all format conversion between column-based data and row-based data
  • use Parquet as binlog file format, and write from arrow data directly

Proposal Benefit Analysis (optional)

Arrow memory usage, test following 3 scenarios used in Milvus:


raw_dataArray Array buffer
int648000080000
FixedSizeList (float32, dim = 128)51200005160064
string (len = 512)51200005160064

For Scalar data, Arrow Array uses memory as same as raw data;

for vector data or string, Arrow Array uses few more memory than raw data (about 4 bytes for each row).


举个例子说明如果用 Arrow 会遇到的问题

在 Insert 的时候,Proxy 收到数据后,会把数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode。

按行拆分是基于 2 个原因:

1. 每个 collection 有 2 个或多个物理 channel,通过多个 channel 同时插入可以提升数据插入性能

2. Pulsar 对于每个 InsertMsg 的大小有限制

对于上面的场景,尝试了 4 种解决方案,每种方案都有各自的问题

- 方案1. Proxy 收到插入数据后只创建 1 个 RecordBatch,数据按行封装到 InsertMsg 中,再通过 Pulsar 发送到 Datanode

- 问题:没法方便地从 Record Batch 中逐条读取数据,Record Batch 有 NewSlice 接口,但是 NewSlice 的返回值无法做除了 Print 之外的任何操作

- 方案2. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 把多个 RecordBatch 合并成一个完整的 RecordBatch

- 问题:多个 RecordBatch 只能逻辑上恢复成一个 ArrowTable,每列数据在物理上是不连续的,这样就无法后续的列存操作

- 方案3. Proxy 收到插入数据后按 Field 创建多个 Array

- 问题:Arrow 不提供 Array 序列化为 []byte 的接口

- 方案4. Proxy 收到插入数据后预先根据 Pulsar 对于 InsertMsg 的大小限制创建多个 RecordBatch,数据按 RecordBatch 分别序列化后插入 InsertMsg,再通过 Pulsar 发送到 Datanode,Datanode 接收到多个 RecordBatch,把每列的数据取出拼接,重新生成一个 RecordBatch

- 问题:违反了 Arrow zero-copy 的设计初衷,与现存方案相比看不到优点何在

总结 Arrow 使用过程中的一些限制:

1. Arrow 数据只能以 RecordBatch 为单位进行序列化和反序列化

2. RecordBatch 不支持以行为单位进行数据拷贝

3. 在 Pulsar 的接收端必须重新创建 RecordBatch

在查询数据流程会遇到同样的问题:

1. segcore 得到的查询结果需要做 2 次 reduce,1 次是 querynode 对多个 segment 的 SearchResult 做归并,另 1 次是 proxy 对多个 querynode 的查询结果做归并,如果查询结果是 RecordBatch 格式,因为无法按行 copy 数据,所以不方便做 reduce 操作

2. querynode 需要把 SearchResult 通过 Pulsar 发送到 Proxy,Proxy 在接收到数据后需要重建 RecordBatch,违反 Arrow zero-copy 的设计初衷

所以我觉得 Arrow 并不适合 Milvus 这种应用场景



Design Details(required)

We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.

Stage 1

  1. Update InsertRequest in milvus.proto, change Insert to use Arrow format
  2. Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
  3. Update QueryResults in milvus.proto, change Query to use Arrow format

Stage 2

  1. Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
  2. Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
  3. Optimize search result flow

Test Plan(required)

Pass all CI flows

References(optional)

https://arrow.apache.org/docs/


Arrow Test Code (Go)

import (
	"bytes"
	"fmt"
	"testing"

	"github.com/apache/arrow/go/arrow"
	"github.com/apache/arrow/go/arrow/array"
	"github.com/apache/arrow/go/arrow/ipc"
	"github.com/apache/arrow/go/arrow/memory"
)

const (
	_DIM = 4
)

var pool = memory.NewGoAllocator()

func CreateArrowSchema() *arrow.Schema {
	fieldVector := arrow.Field{
		Name: "field_vector",
		Type: arrow.FixedSizeListOf(_DIM, arrow.PrimitiveTypes.Float32),
	}
	fieldVal := arrow.Field{
		Name: "field_val",
		Type: arrow.PrimitiveTypes.Int64,
	}
	schema := arrow.NewSchema([]arrow.Field{fieldVector, fieldVal}, nil)
	return schema
}

func CreateArrowRecord(schema *arrow.Schema, iValues []int64, vValues []float32) array.Record {
	rb := array.NewRecordBuilder(pool, schema)
	defer rb.Release()
	rb.Reserve(len(iValues))

	rowNum := len(iValues)
	for i, field := range rb.Schema().Fields() {
		switch field.Type.ID() {
		case arrow.INT64:
			vb := rb.Field(i).(*array.Int64Builder)
			vb.AppendValues(iValues, nil)
		case arrow.FIXED_SIZE_LIST:
			lb := rb.Field(i).(*array.FixedSizeListBuilder)
			valid := make([]bool, rowNum)
			for i := 0; i < rowNum; i++ {
				valid[i] = true
			}
			lb.AppendValues(valid)
			vb := lb.ValueBuilder().(*array.Float32Builder)
			vb.AppendValues(vValues, nil)
		}
	}

	rec := rb.NewRecord()

	return rec
}

func WriteArrowRecord(schema *arrow.Schema, rec array.Record) []byte {
	defer rec.Release()
	blob := make([]byte, 0)
	buf := bytes.NewBuffer(blob)

	// internal/arrdata/ioutil.go
	writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
	defer writer.Close()

	//ShowArrowRecord(rec)
	if err := writer.Write(rec); err != nil {
		panic("could not write record: %v" + err.Error())
	}

	err := writer.Close()
	if err != nil {
		panic(err.Error())
	}

	return buf.Bytes()
}

func ReadArrowRecords(schema *arrow.Schema, blobs [][]byte) array.Record {
	iValues := make([]int64, 0)
	vValues := make([]float32, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		for _, col := range rec.Columns() {
			switch col.DataType().ID() {
			case arrow.INT64:
				arr := col.(*array.Int64)
				iValues = append(iValues, arr.Int64Values()...)
			case arrow.FIXED_SIZE_LIST:
				arr := col.(*array.FixedSizeList).ListValues().(*array.Float32)
				vValues = append(vValues, arr.Float32Values()...)
			}
		}
	}
	ret := CreateArrowRecord(schema, iValues, vValues)
	ShowArrowRecord(ret)

	return ret
}

func ReadArrowRecordsToTable(schema *arrow.Schema, blobs [][]byte) array.Table {
	recs := make([]array.Record, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		recs = append(recs, rec)
	}
	table := array.NewTableFromRecords(schema, recs)
	ShowArrowTable(table)

	return table
}

func ShowArrowRecord(rec array.Record) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", rec.Schema())
	fmt.Printf("NumCols: %v\n", rec.NumCols())
	fmt.Printf("NumRows: %v\n", rec.NumRows())
	//rowNum := int(rec.NumRows())
	for i, col := range rec.Columns() {
		fmt.Printf("Column[%d] %q: %v\n", i, rec.ColumnName(i), col)
	}
}

func ShowArrowTable(tbl array.Table) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", tbl.Schema())
	fmt.Printf("NumCols: %v\n", tbl.NumCols())
	fmt.Printf("NumRows: %v\n", tbl.NumRows())
	for i := 0; i < int(tbl.NumCols()); i++ {
		col := tbl.Column(i)
		fmt.Printf("Column[%d] %s: %v\n", i, tbl.Schema().Field(i).Name, col.Data().Chunks())
	}
}

func TestArrowIPC(t *testing.T) {
	schema := CreateArrowSchema()
	rec0 := CreateArrowRecord(schema, []int64{0}, []float32{0,0,0,0})
	rec1 := CreateArrowRecord(schema, []int64{1,2,3}, []float32{1,1,1,1,2,2,2,2,3,3,3,3})
	blob0 := WriteArrowRecord(schema, rec0)
	blob1 := WriteArrowRecord(schema, rec1)
	ReadArrowRecords(schema, [][]byte{blob0, blob1})
	ReadArrowRecordsToTable(schema, [][]byte{blob0, blob1})
}



  • No labels