Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Insert data flow
  2. Search data flow

Insert Data Flow

BLUE - Column-based data structure,  ORANGE - Row-based data structure, RED DASHED LINE - Data format conversion


Insert data flow includes following steps:

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

    Code Block
    // grpc-proto/milvus.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      repeated schema.FieldData fields_data = 5;	// fields' data
      repeated uint32 hash_keys = 6;
      uint32 num_rows = 7;
    }

    Data is inserted into fields_data by column, schemapb.FieldData is defined as following:

    Code Block
    // grpc-proto/schema.proto
    message ScalarField {
      oneof data {
        BoolArray bool_data = 1;
        IntArray int_data = 2;
        LongArray long_data = 3;
        FloatArray float_data = 4;
        DoubleArray double_data = 5;
        StringArray string_data = 6;
        BytesArray bytes_data = 7;
      }
    }
    ​
    message VectorField {
      int64 dim = 1;
      oneof data {
        FloatArray float_vector = 2;
        bytes binary_vector = 3;
      }
    }
    ​
    message FieldData {
      DataType type = 1;
      string field_name = 2;
      oneof field {
        ScalarField scalars = 3;
        VectorField vectors = 4;
      }
      int64 field_id = 5;
    }
  2.  milvuspb.InsertRequest is serialized and send via gRPC

  3.  Proxy receives milvuspb.InsertRequest, creates InsertTask for it, and adds this task into execution queue (internal/proxy/impl.go::Insert)

  4.  InsertTask is executed, the column-based data stored in InsertTask.req is converted to row-based format, and saved into another internal message with type internalpb.InsertRequest (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

    Code Block
    // internal/proto/internal.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      int64 dbID = 5;
      int64 collectionID = 6;
      int64 partitionID = 7;
      int64 segmentID = 8;
      string channelID = 9;
      repeated uint64 timestamps = 10;
      repeated int64 rowIDs = 11;
      repeated common.Blob row_data = 12;  // row-based data
    }

     rowID and timestamp are added for each row data

  5.  Proxy encapsulates internalpb.InsertRequest into InsertMsg, and send it to pulsar channel

  6.  Datanode receives InsertMsg from pulsar channel, restore data to column-based into structure InsertData (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    Code Block
    type InsertData struct {
        Data  map[FieldID]FieldData // field id to field data
        Infos []BlobInfo
    }
  7.  InsertData is written into Minio with parquet format (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

Search Data Flow

BLUE - Column-based data structure,  ORANGE - Row-based data structure, RED DASHED LINE - Data format conversion 


Search data flow includes following steps:

  1. querynode reads segment's binlog files from Minio, and saves them into structure Blob (internal/querynode/segment_loader.go::loadSegmentFieldsData)

    Code Block
    type Blob struct {
        Key   string    // binlog file path
        Value []byte    // binlog file data
    }

    The data in Blob is deserialized, raw-data in it is saved into structure InsertData

  2.  querynode invokes search engine to get SearchResult (internal/query_node/query_collection.go::search)

    Code Block
    languagecpp
    // internal/core/src/common/Types.h
    struct SearchResult {
     ...
     public:
        int64_t num_queries_;
        int64_t topk_;
        std::vector<float> result_distances_;
    ​
     public:
        void* segment_;
        std::vector<int64_t> internal_seg_offsets_;
        std::vector<int64_t> result_offsets_;
        std::vector<std::vector<char>> row_data_;
    };


    At this time, only "result_distances_" and "internal_seg_offsets_" of "SearchResult" are filled into data.

    querynode reduces all SearchResult returned by segment, fetches all other fields' data, and saves them into "row_data_" with row-based format. (internal/query_node/query_collection.go::reduceSearchResultsAndFillData)

  3. querynode organizes SearchResult again, and save them into structure milvus.Hits

    Code Block
    // internal/proto/milvus.proto
    message Hits {
      repeated int64 IDs = 1;
      repeated bytes row_data = 2;
      repeated float scores = 3;
    }
  4.  Row-based data saved in milvus.Hits is converted to column-based data, and saved into schemapb.SearchResultData (internal/query_node/query_collection.go::translateHits)

    Code Block
    // internal/proto/schema.proto
    message SearchResultData {
      int64 num_queries = 1;
      int64 top_k = 2;
      repeated FieldData fields_data = 3;
      repeated float scores = 4;
      IDs ids = 5;
      repeated int64 topks = 6;
    }
  5.  schemapb.SearchResultData is serialized, encapsulated as internalpb.SearchResults, saved into SearchResultMsg, and send into pulsar channel (internal/query_node/query_collection.go::search)

    Code Block
    // internal/proto/internal.proto
    message SearchResults {
      common.MsgBase base = 1;
      common.Status status = 2;
      string result_channelID = 3;
      string metric_type = 4;
      repeated bytes hits = 5;  // search result data
    ​
      // schema.SearchResultsData inside
      bytes sliced_blob = 9;
      int64 sliced_num_count = 10;
      int64 sliced_offset = 11;
    ​
      repeated int64 sealed_segmentIDs_searched = 6;
      repeated string channelIDs_searched = 7;
      repeated int64 global_sealed_segmentIDs = 8;
    }
  6.  Proxy collects all SearchResultMsg from querynodes, gets schemapb.SearchResultData by deserialization, then gets milvuspb.SearchResults by reducing, finally send back to SDK visa gRPC. (internal/proxy/task.go::SearchTask::PostExecute)

    Code Block
    // internal/proto/milvus.proto
    message SearchResults {
      common.Status status = 1;
      schema.SearchResultData results = 2;
    }
  7.  SDK receives milvuspb.SearchResult

...