Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

From data inspect, Milvus includes 2 data flows mainly: 1). Insert data flow 2). Search data flow

Insert Data Flow

Insert

...

data flow includes following steps:

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

...

  1. Code Block
    languagetext
    // grpc-proto/milvus.proto

...

  1. 
    message InsertRequest {

...

  1. 
     

...

  1.  common.MsgBase base = 1;

...

  1. 
     

...

  1.  string db_name = 2;

...

  1. 
     

...

  1.  string collection_name = 3;

...

  1. 
     

...

  1.  string partition_name = 4;

...

  1. 
     

...

  1.  repeated schema.FieldData fields_data = 5;    // fields' data

...

  1. 
     

...

  1.  repeated uint32 hash_keys = 6;

...

  1. 
     

...

  1.  uint32 num_rows = 7;

...

  1. 
    }

...

  1. Data is inserted into fields_data

...

  1. by column, schemapb.FieldData

...

  1. is defined as following:


    // grpc-proto/schema.proto

    message ScalarField {
     oneof data {
       BoolArray bool_data = 1;
       IntArray int_data = 2;
       LongArray long_data = 3;
       FloatArray float_data = 4;
       DoubleArray double_data = 5;
       StringArray string_data = 6;
       BytesArray bytes_data = 7;
    }
    }

    message VectorField {
     int64 dim = 1;
     oneof data {
       FloatArray float_vector = 2;
       bytes binary_vector = 3;
    }
    }

    message FieldData {
     DataType type = 1;
     string field_name = 2;
     oneof field {
       ScalarField scalars = 3;
       VectorField vectors = 4;
    }
     int64 field_id = 5;
    }
  2. milvuspb.InsertRequest 被序列化后通过 gRPC 发送

  3. Proxy 组件收到 milvuspb.InsertRequest,为他创建任务 InsertTask,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)

  4. InsertTask 被执行,InsertTask.req 中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest 中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

    internalpb.InsertRequest 的定义如下:

    // internal/proto/internal.proto
    message InsertRequest {
     common.MsgBase base = 1;
     string db_name = 2;
     string collection_name = 3;
     string partition_name = 4;
     int64 dbID = 5;
     int64 collectionID = 6;
     int64 partitionID = 7;
     int64 segmentID = 8;
     string channelID = 9;
     repeated uint64 timestamps = 10;
     repeated int64 rowIDs = 11;
     repeated common.Blob row_data = 12;  // row-based data
    }

    并为每行数据添加 rowIDtimestamp

  5. Proxy 把包含 internalpb.InsertRequestInsertMsg 发送到 pulsar channel 中

  6. Datanode 从 pulsar channel 中收到 InsertMsg ,把数据重新恢复为 列式 存储,保存在内存结构 InsertData 中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    type InsertData struct {
    Data  map[FieldID]FieldData // field id to field data
    Infos []BlobInfo
    }
  7. InsertData 以 parque 的格式被持久化到 Minio (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

Search Data Flow

  1. querynode 在收到 LoadSegments 请求后,会把 segment 对应的所有 binlog 文件从 minio 加载到内存,并放入内存数据结构 Blob 中 (internal/querynode/segment_loader.go::loadSegmentFieldsData)

    type Blob struct {
    Key   string // binlog file path
    Value []byte // binlog file data
    }

    通常 querynode 只加载标量数据,不加载向量数据,除非该向量列未建 index

    Blob 里的数据经过反序列化后,提取出原始数据,存入 InsertData 数据结构中

  2. querynode 执行 search 请求,通过 CGO 调用 knowhere 搜索引擎,得到 SearchResult (internal/query_node/query_collection.go::search)

    // internal/core/src/common/Types.h
    struct SearchResult {
    ...
    public:
       int64_t num_queries_;
       int64_t topk_;
       std::vector<float> result_distances_;

    public:
       void* segment_;
       std::vector<int64_t> internal_seg_offsets_;
       std::vector<int64_t> result_offsets_;
       std::vector<std::vector<char>> row_data_;
    };

    knowhere 返回的 SearchResult 中只有 result_distances_internal_seg_offsets_ 被填入了数据。

    querynode 在得到所有 segment 返回的 SearchResult 后,对结果做归并,并通过 internal_seg_offsets_ 得到其它输出列数据,并按 行存 格式写入 row_data_ 中 (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)

  3. querynodeSearchResult 数据再次整理,存入数据结构 milvus.Hits

    // internal/proto/milvus.proto
    message Hits {
     repeated int64 IDs = 1;
     repeated bytes row_data = 2;
     repeated float scores = 3;
    }
  4. milvus.Hits 中的数据通过函数 translateHits 转为 列存 数据 schemapb.SearchResultData (internal/query_node/query_collection.go::translateHits)

    // internal/proto/schema.proto
    message SearchResultData {
     int64 num_queries = 1;
     int64 top_k = 2;
     repeated FieldData fields_data = 3;
     repeated float scores = 4;
     IDs ids = 5;
     repeated int64 topks = 6;
    }
  5. schemapb.SearchResultData 被序列化后,封装为 internalpb.SearchResults,并放入 msgstream.SearchResultMsg,通过 pulsar channel 发送 (internal/query_node/query_collection.go::search)

    // internal/proto/internal.proto
    message SearchResults {
     common.MsgBase base = 1;
     common.Status status = 2;
     string result_channelID = 3;
     string metric_type = 4;
     repeated bytes hits = 5; // search result data

     // schema.SearchResultsData inside
     bytes sliced_blob = 9;
     int64 sliced_num_count = 10;
     int64 sliced_offset = 11;

     repeated int64 sealed_segmentIDs_searched = 6;
     repeated string channelIDs_searched = 7;
     repeated int64 global_sealed_segmentIDs = 8;
    }
  6. proxy 从 pulsar channel 中收集到所有 querynode 发送过来的 msgstream.SearchResultMsg,反序列化得到 schemapb.SearchResultData,再做一次归并,数据放入 milvuspb.SearchResults,通过 gRPC 传回 SDK (internal/proxy/task.go::SearchTask::PostExecute)

    // internal/proto/milvus.proto
    message SearchResults {
     common.Status status = 1;
     schema.SearchResultData results = 2;
    }
  7. SDK 收到 milvuspb.SearchResults,返回

Public Interfaces(optional)

...