Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Current state: Accepted



Keywords: Kafka

Released: with Milvus 2.1 



The log broker is a pub-sub system within Milvus, It is responsible for streaming data persistence,  event notification, recovery etc.  Now Milvus cluster mode uses Pulsar as a log broker, and standalone mode uses RocksDB. 

Apache Kafka is a distributed event store and stream-processing platform, and it is a popular solution for data streaming needs.  Many community users expect Milvus to support Kafka because they have already used it in the production environment.

Image Source


 Milvus supports Kafka as a message stream,  we can use the configuration option to decide to use Pulsar or Kafka on cluster mode. We provide the function KafkaEnable() to use Kafka. If you don't want to use kafka, you need to comment out the configuration. Same for Pulsar and Rocksmq. If the configuration of pulsar, kafka and rocksmq are readable. then use rocksmq in standalone mode and pulsar in cluster.

  address: localhost
  port: 6650
  maxMessageSize: 5242880
#  brokers:
#	- localhost:9092
#  port: 9092

Design Details

  • add kafka and zookepper dev docker
  • optimization mq_factory configuration initialization
  • remove reader
  • implement msg_stream with kafka 


version: '3.5'

    image: 'bitnami/zookeeper:3.6.3'
      - '2181:2181'
    image: 'bitnami/kafka:3.1.0'
      - '9092:9092'
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - zookeeper

    name: milvus_dev

Kafka Client SDK

  • Sarama 
  • confluent-kafka-go

We tried using sarama and confluent-kafka-go in our development and found that there was basically no difference in the producer. But there is a big difference when using consumer group. 

Sarama use consumer group need to implement Sarama interface. It make very diffcult to control  and hard to seek.

confulent-kafka-go use consumer group to consume messages just a function. It is very simple to use. This function allows you to directly set the offset from which to start consumption.

Interface Implementation

package kafka

import (


type Consumer struct {
	c          *kafka.Consumer
	msgChannel chan mqwrapper.Message
	closeFlag  bool
	skipMsg    bool
	topicName  string
	groupID    string
	closeCh    chan struct{}
	chanOnce   sync.Once
	closeOnce  sync.Once
	wg         sync.WaitGroup

func newKafkaConsumer(consumer *kafka.Consumer, topicName string, groupID string) *Consumer {
	msgChannel := make(chan mqwrapper.Message, 1)
	closeCh := make(chan struct{})
	skipMsg := false

	kafkaConsumer := &Consumer{c: consumer,
		msgChannel: msgChannel,
		skipMsg:    skipMsg,
		topicName:  topicName,
		groupID:    groupID,
		closeCh:    closeCh,

	return kafkaConsumer

func (kc *Consumer) Subscription() string {
	return kc.groupID

// Chan provides a channel to read consumed message.
// There are some illustrations need to clarify,
// 1.confluent-kafka-go recommend us to use function-based consumer,
// channel-based consumer API had already deprecated, see more details
// 2.This API of other MQ return channel directly, but it depends on
// readMessage firstly which means it be always triggered within select-case
// invocation. However, it still works well, because it covers all messages
// consume situation: start from the earliest or latest position to keep consume;
// start from a seek position to specified end position.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
	if kc.skipMsg {
		msg := kc.readMessage()
		if msg != nil {
			kc.skipMsg = false

	msg := kc.readMessage()

	if msg != nil {
		kc.msgChannel <- &kafkaMessage{msg: msg}
	} else {
		kc.msgChannel <- nil

	return kc.msgChannel

func (kc *Consumer) readMessage() *kafka.Message {
	msg, err := kc.c.ReadMessage(1 * time.Second)
	if err != nil {
		if err.(kafka.Error).Code() != kafka.ErrTimedOut {
			log.Error("read msg failed", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID), zap.Error(err))
		return nil
	return msg

func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
	offset := kafka.Offset(id.(*kafkaID).messageID)
	log.Debug("kafka consumer seek ", zap.String("topic name", kc.topicName),
		zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))

	//There is need to invoke Unassign before Assign or seek twice will fail
	//on the same topic and partition.
	err := kc.c.Unassign()
	if err != nil {
		log.Error("kafka consumer unassign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
		return err

	err = kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topicName, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
	if err != nil {
		log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
		return err

	// If seek timeout is not 0 the call twice will return error state RD_KAFKA_RESP_ERR__STATE.
	// if the timeout is 0 it will initiate the seek  but return immediately without any error reporting
	kc.skipMsg = !inclusive
	return kc.c.Seek(kafka.TopicPartition{
		Topic:     &kc.topicName,
		Partition: mqwrapper.DefaultPartitionIdx,
		Offset:    offset}, 0)

func (kc *Consumer) Ack(message mqwrapper.Message) {

func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
	_, high, err := kc.c.QueryWatermarkOffsets(kc.topicName, mqwrapper.DefaultPartitionIdx, -1)
	if err != nil {
		return nil, err

	// Current high value is next offset of the latest message ID, in order to keep
	// semantics consistency with the latest message ID, the high value need to move forward.
	if high > 0 {
		high = high - 1

	return &kafkaID{messageID: high}, nil

func (kc *Consumer) Close() {
	log.Debug("close kafka consumer", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID))
	kc.closeOnce.Do(func() {
package kafka

import (


type kafkaProducer struct {
	p            *kafka.Producer
	topic        string
	deliveryChan chan kafka.Event
	closeOnce    sync.Once

func (kp *kafkaProducer) Topic() string {
	return kp.topic

func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
	err := kp.p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
		Value:          message.Payload,
	}, kp.deliveryChan)

	if err != nil {
		return nil, err

	e := <-kp.deliveryChan
	m := e.(*kafka.Message)
	if m.TopicPartition.Error != nil {
		return nil, m.TopicPartition.Error

	return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil

func (kp *kafkaProducer) Close() {
	kp.closeOnce.Do(func() {


  • standalone
    • docker
  • Cluster
    • Helm Chart
    • Operator

Test Plan

  • pass the unittest
  • performance testing

  • No labels