Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titlekafka_consumer.go
package kafka

import (
	"sync"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
	"go.uber.org/zap"
)

type Consumer struct {
	c          *kafka.Consumer
	msgChannel chan mqwrapper.Message
	closeFlag  bool
	skipMsg    bool
	topicName  string
	groupID    string
	closeCh    chan struct{}
	chanOnce   sync.Once
	closeOnce  sync.Once
	wg         sync.WaitGroup
}

func newKafkaConsumer(consumer *kafka.Consumer, topicName string, groupID string) *Consumer {
	msgChannel := make(chan mqwrapper.Message, 1)
	closeCh := make(chan struct{})
	skipMsg := false

	kafkaConsumer := &Consumer{c: consumer,
		msgChannel: msgChannel,
		skipMsg:    skipMsg,
		topicName:  topicName,
		groupID:    groupID,
		closeCh:    closeCh,
	}

	return kafkaConsumer
}

func (kc *Consumer) Subscription() string {
	return kc.groupID
}

// Chan provides a channel to read consumed message.
// There are some illustrations need to clarify,
// 1.confluent-kafka-go recommend us to use function-based consumer,
// channel-based consumer API had already deprecated, see more details
// https://github.com/confluentinc/confluent-kafka-go.
//
// 2.This API of other MQ return channel directly, but it depends on
// readMessage firstly which means it be always triggered within select-case
// invocation. However, it still works well, because it covers all messages
// consume situation: start from the earliest or latest position to keep consume;
// start from a seek position to specified end position.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
	if kc.skipMsg {
		msg := kc.readMessage()
		if msg != nil {
			kc.skipMsg = false
		}
	}

	msg := kc.readMessage()

	if msg != nil {
		kc.msgChannel <- &kafkaMessage{msg: msg}
	} else {
		kc.msgChannel <- nil
	}

	return kc.msgChannel
}

func (kc *Consumer) readMessage() *kafka.Message {
	msg, err := kc.c.ReadMessage(1 * time.Second)
	if err != nil {
		if err.(kafka.Error).Code() != kafka.ErrTimedOut {
			log.Error("read msg failed", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID), zap.Error(err))
		}
		return nil
	}
	return msg
}

func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
	offset := kafka.Offset(id.(*kafkaID).messageID)
	log.Debug("kafka consumer seek ", zap.String("topic name", kc.topicName),
		zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))

	//There is need to invoke Unassign before Assign or seek twice will fail
	//on the same topic and partition.
	err := kc.c.Unassign()
	if err != nil {
		log.Error("kafka consumer unassign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
		return err
	}

	err = kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topicName, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
	if err != nil {
		log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
		return err
	}

	// If seek timeout is not 0 the call twice will return error state RD_KAFKA_RESP_ERR__STATE.
	// if the timeout is 0 it will initiate the seek  but return immediately without any error reporting
	kc.skipMsg = !inclusive
	return kc.c.Seek(kafka.TopicPartition{
		Topic:     &kc.topicName,
		Partition: mqwrapper.DefaultPartitionIdx,
		Offset:    offset}, 0)
}

func (kc *Consumer) Ack(message mqwrapper.Message) {
	kc.c.Commit()
}

func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
	_, high, err := kc.c.QueryWatermarkOffsets(kc.topicName, mqwrapper.DefaultPartitionIdx, -1)
	if err != nil {
		return nil, err
	}

	// Current high value is next offset of the latest message ID, in order to keep
	// semantics consistency with the latest message ID, the high value need to move forward.
	if high > 0 {
		high = high - 1
	}

	return &kafkaID{messageID: high}, nil
}

func (kc *Consumer) Close() {
	log.Debug("close kafka consumer", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID))
	kc.closeOnce.Do(func() {
		kc.c.Unsubscribe()
		kc.c.Close()
		close(kc.closeCh)
	})
}


Code Block
titlekafka_producer.go
package kafka

import (
	"context"
	"sync"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)

type kafkaProducer struct {
	p            *kafka.Producer
	topic        string
	deliveryChan chan kafka.Event
	closeOnce    sync.Once
}

func (kp *kafkaProducer) Topic() string {
	return kp.topic
}

func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
	err := kp.p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
		Value:          message.Payload,
	}, kp.deliveryChan)

	if err != nil {
		return nil, err
	}

	e := <-kp.deliveryChan
	m := e.(*kafka.Message)
	if m.TopicPartition.Error != nil {
		return nil, m.TopicPartition.Error
	}

	kp.p.Flush(5000)
	return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
}

func (kp *kafkaProducer) Close() {
	kp.closeOnce.Do(func() {
		kp.p.Close()
		close(kp.deliveryChan)
	})
}

...