Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1.  pymilvus creates a data insert request with type milvuspb.InsertRequest (client/prepare.py::bulk_insert_param)

    Code Block
    languagetext
    // grpc-proto/milvus.proto
    message InsertRequest {
      common.MsgBase base = 1;
      string db_name = 2;
      string collection_name = 3;
      string partition_name = 4;
      repeated schema.FieldData fields_data = 5;    // fields' data
      repeated uint32 hash_keys = 6;
      uint32 num_rows = 7;
    }

    Data is inserted into fields_data by column, schemapb.FieldData is defined as following:


    Code Block
    // grpc-proto/schema.proto
    message ScalarField {
      oneof data {
        BoolArray bool_data = 1;
        IntArray int_data = 2;
        LongArray long_data = 3;
        FloatArray float_data = 4;
        DoubleArray double_data = 5;
        StringArray string_data = 6;
        BytesArray bytes_data = 7;
      }
    }
    ​
    message VectorField {
      int64 dim = 1;
      oneof data {
        FloatArray float_vector = 2;
        bytes binary_vector = 3;
      }
    }
    ​
    message FieldData {
      DataType type = 1;
      string field_name = 2;
      oneof field {
        ScalarField scalars = 3;
        VectorField vectors = 4;
      }
      int64 field_id = 5;
    }

    // grpc-proto/schema.proto

    message ScalarField {
     oneof data {
       BoolArray bool_data = 1;
       IntArray int_data = 2;
       LongArray long_data = 3;
       FloatArray float_data = 4;
       DoubleArray double_data = 5;
       StringArray string_data = 6;
       BytesArray bytes_data = 7;
    }
    }

    message VectorField {
     int64 dim = 1;
     oneof data {
       FloatArray float_vector = 2;
       bytes binary_vector = 3;
    }
    }

    message FieldData {
     DataType type = 1;
     string field_name = 2;
     oneof field {
       ScalarField scalars = 3;
       VectorField vectors = 4;
    }
     int64 field_id = 5;
    }
  2. milvuspb.InsertRequest 被序列化后通过 gRPC 发送

  3. Proxy 组件收到 milvuspb.InsertRequest,为他创建任务 InsertTask,并把该任务加入到执行队列中(internal/proxy/impl.go::Insert)

  4. InsertTask 被执行,InsertTask.req 中的 列存 数据被转换为 行存 数据,并存入另一个用于 Milvus 内部流转的消息请求 internalpb.InsertRequest 中 (internal/proxy/task.go::transferColumnBasedRequestToRowBasedData)

    internalpb.InsertRequest 的定义如下:

    // internal/proto/internal.proto
    message InsertRequest {
     common.MsgBase base = 1;
     string db_name = 2;
     string collection_name = 3;
     string partition_name = 4;
     int64 dbID = 5;
     int64 collectionID = 6;
     int64 partitionID = 7;
     int64 segmentID = 8;
     string channelID = 9;
     repeated uint64 timestamps = 10;
     repeated int64 rowIDs = 11;
     repeated common.Blob row_data = 12;  // row-based data
    }

    并为每行数据添加 rowIDtimestamp

  5. Proxy 把包含 internalpb.InsertRequestInsertMsg 发送到 pulsar channel 中

  6. Datanode 从 pulsar channel 中收到 InsertMsg ,把数据重新恢复为 列式 存储,保存在内存结构 InsertData 中 (internal/datanode/flow_graph_insert_buffer_node.go::insertBufferNode::Operate)

    type InsertData struct {
    Data  map[FieldID]FieldData // field id to field data
    Infos []BlobInfo
    }
  7. InsertData 以 parque 的格式被持久化到 Minio (internal/datanode/flow_graph_insert_buffer_node.go::flushSegment)

...