You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Current state: "Under Discussion"

ISSUE: #7210

PRs: 

Keywords: arrow

Released: Milvus 2.0

Summary(required)

What are we going to do?

Motivation(required)

From data inspect, Milvus includes 2 data flows mainly: 1). Insert data flow 2). Search data flow

Insert Data Flow

Insert data flow includes following steps:

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

    // grpc-proto/milvus.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      repeated schema.FieldData fields_data = 5;    // fields' data
      repeated uint32 hash_keys = 6;
      uint32 num_rows = 7;
    }

    Data is inserted into fields_data by column, schemapb.FieldData is defined as following:


    // grpc-proto/schema.proto

    message ScalarField {
     oneof data {
       BoolArray bool_data = 1;
       IntArray int_data = 2;
       LongArray long_data = 3;
       FloatArray float_data = 4;
       DoubleArray double_data = 5;
       StringArray string_data = 6;
       BytesArray bytes_data = 7;
    }
    }

    message VectorField {
     int64 dim = 1;
     oneof data {
       FloatArray float_vector = 2;
       bytes binary_vector = 3;
    }
    }

    message FieldData {
     DataType type = 1;
     string field_name = 2;
     oneof field {
       ScalarField scalars = 3;
       VectorField vectors = 4;
    }
     int64 field_id = 5;
    }
  2. milvuspb.InsertRequest 被序列化后通过 gRPC 发送

  3. Proxy 组件收到 milvuspb.InsertRequest,为他创建任务 InsertTask,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)

  4. InsertTask 被执行,InsertTask.req 中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest 中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

    internalpb.InsertRequest 的定义如下:

    // internal/proto/internal.proto
    message InsertRequest {
     common.MsgBase base = 1;
     string db_name = 2;
     string collection_name = 3;
     string partition_name = 4;
     int64 dbID = 5;
     int64 collectionID = 6;
     int64 partitionID = 7;
     int64 segmentID = 8;
     string channelID = 9;
     repeated uint64 timestamps = 10;
     repeated int64 rowIDs = 11;
     repeated common.Blob row_data = 12;  // row-based data
    }

    并为每行数据添加 rowIDtimestamp

  5. Proxy 把包含 internalpb.InsertRequestInsertMsg 发送到 pulsar channel 中

  6. Datanode 从 pulsar channel 中收到 InsertMsg ,把数据重新恢复为 列式 存储,保存在内存结构 InsertData 中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    type InsertData struct {
    Data  map[FieldID]FieldData // field id to field data
    Infos []BlobInfo
    }
  7. InsertData 以 parque 的格式被持久化到 Minio (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

Search Data Flow

  1. querynode 在收到 LoadSegments 请求后,会把 segment 对应的所有 binlog 文件从 minio 加载到内存,并放入内存数据结构 Blob 中 (internal/querynode/segment_loader.go::loadSegmentFieldsData)

    type Blob struct {
    Key   string // binlog file path
    Value []byte // binlog file data
    }

    通常 querynode 只加载标量数据,不加载向量数据,除非该向量列未建 index

    Blob 里的数据经过反序列化后,提取出原始数据,存入 InsertData 数据结构中

  2. querynode 执行 search 请求,通过 CGO 调用 knowhere 搜索引擎,得到 SearchResult (internal/query_node/query_collection.go::search)

    // internal/core/src/common/Types.h
    struct SearchResult {
    ...
    public:
       int64_t num_queries_;
       int64_t topk_;
       std::vector<float> result_distances_;

    public:
       void* segment_;
       std::vector<int64_t> internal_seg_offsets_;
       std::vector<int64_t> result_offsets_;
       std::vector<std::vector<char>> row_data_;
    };

    knowhere 返回的 SearchResult 中只有 result_distances_internal_seg_offsets_ 被填入了数据。

    querynode 在得到所有 segment 返回的 SearchResult 后,对结果做归并,并通过 internal_seg_offsets_ 得到其它输出列数据,并按 行存 格式写入 row_data_ 中 (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)

  3. querynodeSearchResult 数据再次整理,存入数据结构 milvus.Hits

    // internal/proto/milvus.proto
    message Hits {
     repeated int64 IDs = 1;
     repeated bytes row_data = 2;
     repeated float scores = 3;
    }
  4. milvus.Hits 中的数据通过函数 translateHits 转为 列存 数据 schemapb.SearchResultData (internal/query_node/query_collection.go::translateHits)

    // internal/proto/schema.proto
    message SearchResultData {
     int64 num_queries = 1;
     int64 top_k = 2;
     repeated FieldData fields_data = 3;
     repeated float scores = 4;
     IDs ids = 5;
     repeated int64 topks = 6;
    }
  5. schemapb.SearchResultData 被序列化后,封装为 internalpb.SearchResults,并放入 msgstream.SearchResultMsg,通过 pulsar channel 发送 (internal/query_node/query_collection.go::search)

    // internal/proto/internal.proto
    message SearchResults {
     common.MsgBase base = 1;
     common.Status status = 2;
     string result_channelID = 3;
     string metric_type = 4;
     repeated bytes hits = 5; // search result data

     // schema.SearchResultsData inside
     bytes sliced_blob = 9;
     int64 sliced_num_count = 10;
     int64 sliced_offset = 11;

     repeated int64 sealed_segmentIDs_searched = 6;
     repeated string channelIDs_searched = 7;
     repeated int64 global_sealed_segmentIDs = 8;
    }
  6. proxy 从 pulsar channel 中收集到所有 querynode 发送过来的 msgstream.SearchResultMsg,反序列化得到 schemapb.SearchResultData,再做一次归并,数据放入 milvuspb.SearchResults,通过 gRPC 传回 SDK (internal/proxy/task.go::SearchTask::PostExecute)

    // internal/proto/milvus.proto
    message SearchResults {
     common.Status status = 1;
     schema.SearchResultData results = 2;
    }
  7. SDK 收到 milvuspb.SearchResults,返回

Public Interfaces(optional)

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed.

Design Details(required)

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

## Compatibility, Deprecation, and Migration Plan(optional)
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?


Test Plan(required)

Describe in few sentences how the MEP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives(optional)

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

References(optional)

Briefly list all references


  • No labels