Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: "Under DiscussionRejected"

ISSUE: #7210

PRs: 

Keywords: arrow/column-based/row-based

...

  • omit the serialization and deserialization between SDK and proxy
  • remove all format conversion between column-based data and row-based data
  • use Parquet as binlog file format, and write from arrow data directly

...

Proposal Benefit Analysis (optional)

We will use Arrow to replace all in-memory data format used in Insert/Search/Query. 

All proto definitions which are used to communicate between SDK and proxy are listed below:

  • Proto changed for Insert
Code Block
// internal/proto/milvus.proto
message InsertRequest {
  ...
- repeated schema.FieldData fields_data = 5;
+ bytes batch_record = 5;
  ...
}
  • Proto changed for Search
Code Block
// internal/proto/milvus.proto
-message PlaceholderValue {
-  string tag = 1;
-  PlaceholderType type = 2;
-  // values is a 2d-array, every array contains a vector
-  repeated bytes values = 3;
-}

-message PlaceholderGroup {
-  repeated PlaceholderValue placeholders = 1;
-}

message SearchRequest {
  ...
- bytes placeholder_group = 6; // must
+ bytes vector_record = 6; // must
  ...
}

message Hits {
  ...
- repeated bytes row_data = 2;
+ bytes batch_record = 2;
  ...
}

// internal/proto/schema.proto
message SearchResultData {
  ...
- repeated FieldData fields_data = 3;
+ bytes batch_record = 3;
  ...
}
  • Proto changed for Query
Code Block
// internal/proto/milvus.proto
message QueryResults {
  ...
- repeated schema.FieldData fields_data = 2;
+ bytes batch_record = 2;
}

Design Details(required)

After this MEP, Milvus will not be compatible with previous Milvus 2.0.0-rcX, because:

  • the proto format between SDK and proxy changed
  • binlog file format changed

We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.

Stage 1

  1. Update InsertRequest in milvus.proto, change Insert to use Arrow format
  2. Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
  3. Update QueryResults in milvus.proto, change Query to use Arrow format

Stage 2

  1. Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
  2. Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
  3. Optimize search result flow

Test Plan(required)

Pass all CI flows

References(optional)

...

memory usage, test following 3 scenarios used in Milvus:

Data TypeRaw Data Size (Byte)Array Array Buffer Size (Byte)
int648000080000
FixedSizeList (float32, dim = 128)51200005160064
string (len = 512)51200005160064

For Scalar data, Arrow Array uses memory as same as raw data;

for vector data or string, Arrow Array uses few more memory than raw data (about 4 bytes for each row).


Give an example to illustrate the problems we will encounter if using Arrow.

During insert, after Proxy receives data, it will encapsulate the data into InsertMsg by line, and then send it to Datanode through Pulsar.

Splitting by line is based on two reasons:

  1. Each collection has two or more physical channels. Data insertion performance can be improved by inserting multiple channels at the same time.
  2. Pulsar limits the size of each InsertMsg

We tried 4 solutions, each has its own problems:

  • Solution-1

After the Proxy receives the inserted data, it only creates one Arrow RecordBatch, encapsulates the data into InsertMsg by line, and then sends it to Datanode through Pulsar.

PROBLEMThere is no interface to read data item by item from Arrow RecordBatch. RecordBatch has a NewSlice interface, but the return value of NewSlice cannot do anything except print.

  • Solution-2

After the Proxy receives the inserted data, it creates multiple Arrow RecordBatch in advance according to the size limit of Pulsar for InsertMsg. The data is serialized according to the RecordBatch, inserted into the InsertMsg, and then sent to the Datanode through Pulsar. Datanode combines multiple RecordBatch into one complete RecordBatch.

PROBLEM: Multiple RecordBatch can only be logically restored to one ArrowTable, but each column of data is physically discontinuous, so subsequent columnar operations cannot be performed.

  • Solution-3

After the Proxy receives the inserted data, create multiple Arrow Array by field, instead of RecordBatch.

PROBLEM: The primitive unit of serialized data in Arrow is RecordBatch. Arrow does not provide interface to serialize Arrow Array.

  • Solution-4

After the Proxy receives the inserted data, it creates multiple RecordBatch in advance according to the size limit of the Pulsar for InsertMsg. The data is serialized according to the RecordBatch and inserted into the InsertMsg, and then sent to the Datanode through Pulsar. The Datanode receives multiple RecordBatch, fetches the data from each column, and regenerates a new RecordBatch.

PROBLEM: There seems no advantages comparing with current implementation.


Summarize some limitations in the use of Arrow:

  1. Arrow data can only be serialized and deserialized by unit of RecordBatch;
  2. Cannot copy out row data from RecordBatch;
  3. RecordBatch must be regenerated after sending via pulsar.


Arrow is suitable for data analysis scenario (data is sealed and read only).

In Milvus, we need do data split and concatenate.

Arrow seems not a good choice for Milvus.


Design Details(required)

We divide this MEP into 2 stages, all compatibility changes will be achieved in Stage 1 before Milvus 2.0.0, other internal changes can be left later.

Stage 1

  1. Update InsertRequest in milvus.proto, change Insert to use Arrow format
  2. Update SearchRequest/Hits in milvus.proto, and SearchResultData in schema.proto, change Search to use Arrow format
  3. Update QueryResults in milvus.proto, change Query to use Arrow format

Stage 2

  1. Update Storage module to use GoArrow to write Parquet from Arrow, or read Arrow from Parquet directly, remove C++ Arrow.
  2. Remove all internal row-based data structure, including "RowData" in internalpb.InsertRequest, "row_data" in milvuspb.Hits, "row_data_" in C++ SearchResult.
  3. Optimize search result flow

Test Plan(required)

Pass all CI flows

References(optional)

https://arrow.apache.org/docs/


Arrow Test Code (Go)

Code Block
import (
	"bytes"
	"fmt"
	"testing"

	"github.com/apache/arrow/go/arrow"
	"github.com/apache/arrow/go/arrow/array"
	"github.com/apache/arrow/go/arrow/ipc"
	"github.com/apache/arrow/go/arrow/memory"
)

const (
	_DIM = 4
)

var pool = memory.NewGoAllocator()

func CreateArrowSchema() *arrow.Schema {
	fieldVector := arrow.Field{
		Name: "field_vector",
		Type: arrow.FixedSizeListOf(_DIM, arrow.PrimitiveTypes.Float32),
	}
	fieldVal := arrow.Field{
		Name: "field_val",
		Type: arrow.PrimitiveTypes.Int64,
	}
	schema := arrow.NewSchema([]arrow.Field{fieldVector, fieldVal}, nil)
	return schema
}

func CreateArrowRecord(schema *arrow.Schema, iValues []int64, vValues []float32) array.Record {
	rb := array.NewRecordBuilder(pool, schema)
	defer rb.Release()
	rb.Reserve(len(iValues))

	rowNum := len(iValues)
	for i, field := range rb.Schema().Fields() {
		switch field.Type.ID() {
		case arrow.INT64:
			vb := rb.Field(i).(*array.Int64Builder)
			vb.AppendValues(iValues, nil)
		case arrow.FIXED_SIZE_LIST:
			lb := rb.Field(i).(*array.FixedSizeListBuilder)
			valid := make([]bool, rowNum)
			for i := 0; i < rowNum; i++ {
				valid[i] = true
			}
			lb.AppendValues(valid)
			vb := lb.ValueBuilder().(*array.Float32Builder)
			vb.AppendValues(vValues, nil)
		}
	}

	rec := rb.NewRecord()

	return rec
}

func WriteArrowRecord(schema *arrow.Schema, rec array.Record) []byte {
	defer rec.Release()
	blob := make([]byte, 0)
	buf := bytes.NewBuffer(blob)

	// internal/arrdata/ioutil.go
	writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
	defer writer.Close()

	//ShowArrowRecord(rec)
	if err := writer.Write(rec); err != nil {
		panic("could not write record: %v" + err.Error())
	}

	err := writer.Close()
	if err != nil {
		panic(err.Error())
	}

	return buf.Bytes()
}

func ReadArrowRecords(schema *arrow.Schema, blobs [][]byte) array.Record {
	iValues := make([]int64, 0)
	vValues := make([]float32, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		for _, col := range rec.Columns() {
			switch col.DataType().ID() {
			case arrow.INT64:
				arr := col.(*array.Int64)
				iValues = append(iValues, arr.Int64Values()...)
			case arrow.FIXED_SIZE_LIST:
				arr := col.(*array.FixedSizeList).ListValues().(*array.Float32)
				vValues = append(vValues, arr.Float32Values()...)
			}
		}
	}
	ret := CreateArrowRecord(schema, iValues, vValues)
	ShowArrowRecord(ret)

	return ret
}

func ReadArrowRecordsToTable(schema *arrow.Schema, blobs [][]byte) array.Table {
	recs := make([]array.Record, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		recs = append(recs, rec)
	}
	table := array.NewTableFromRecords(schema, recs)
	ShowArrowTable(table)

	return table
}

func ShowArrowRecord(rec array.Record) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", rec.Schema())
	fmt.Printf("NumCols: %v\n", rec.NumCols())
	fmt.Printf("NumRows: %v\n", rec.NumRows())
	//rowNum := int(rec.NumRows())
	for i, col := range rec.Columns() {
		fmt.Printf("Column[%d] %q: %v\n", i, rec.ColumnName(i), col)
	}
}

func ShowArrowTable(tbl array.Table) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", tbl.Schema())
	fmt.Printf("NumCols: %v\n", tbl.NumCols())
	fmt.Printf("NumRows: %v\n", tbl.NumRows())
	for i := 0; i < int(tbl.NumCols()); i++ {
		col := tbl.Column(i)
		fmt.Printf("Column[%d] %s: %v\n", i, tbl.Schema().Field(i).Name, col.Data().Chunks())
	}
}

func TestArrowIPC(t *testing.T) {
	schema := CreateArrowSchema()
	rec0 := CreateArrowRecord(schema, []int64{0}, []float32{0,0,0,0})
	rec1 := CreateArrowRecord(schema, []int64{1,2,3}, []float32{1,1,1,1,2,2,2,2,3,3,3,3})
	blob0 := WriteArrowRecord(schema, rec0)
	blob1 := WriteArrowRecord(schema, rec1)
	ReadArrowRecords(schema, [][]byte{blob0, blob1})
	ReadArrowRecordsToTable(schema, [][]byte{blob0, blob1})
}