Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

confulent-kafka-go use consumer group to consume messages just a function. It is very simple to use. This function allows you to directly set the offset from which to start consumption.

Interface Implementation

Code Block
titlekafka_consumer.go
package kafka

import (
"sync"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"go.uber.org/zap"
)

type Consumer struct {
c *kafka.Consumer
msgChannel chan mqwrapper.Message
closeFlag bool
skipMsg bool
topicName string
groupID string
closeCh chan struct{}
chanOnce sync.Once
closeOnce sync.Once
wg sync.WaitGroup
}

func newKafkaConsumer(consumer *kafka.Consumer, topicName string, groupID string) *Consumer {
msgChannel := make(chan mqwrapper.Message, 1)
closeCh := make(chan struct{})
skipMsg := false

kafkaConsumer := &Consumer{c: consumer,
msgChannel: msgChannel,
skipMsg: skipMsg,
topicName: topicName,
groupID: groupID,
closeCh: closeCh,
}

return kafkaConsumer
}

func (kc *Consumer) Subscription() string {
return kc.groupID
}

// Chan provides a channel to read consumed message.
// There are some illustrations need to clarify,
// 1.confluent-kafka-go recommend us to use function-based consumer,
// channel-based consumer API had already deprecated, see more details
// https://github.com/confluentinc/confluent-kafka-go.
//
// 2.This API of other MQ return channel directly, but it depends on
// readMessage firstly which means it be always triggered within select-case
// invocation. However, it still works well, because it covers all messages
// consume situation: start from the earliest or latest position to keep consume;
// start from a seek position to specified end position.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
if kc.skipMsg {
msg := kc.readMessage()
if msg != nil {
kc.skipMsg = false
}
}

msg := kc.readMessage()

if msg != nil {
kc.msgChannel <- &kafkaMessage{msg: msg}
} else {
kc.msgChannel <- nil
}

return kc.msgChannel
}

func (kc *Consumer) readMessage() *kafka.Message {
msg, err := kc.c.ReadMessage(1 * time.Second)
if err != nil {
if err.(kafka.Error).Code() != kafka.ErrTimedOut {
log.Error("read msg failed", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID), zap.Error(err))
}
return nil
}
return msg
}

func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
offset := kafka.Offset(id.(*kafkaID).messageID)
log.Debug("kafka consumer seek ", zap.String("topic name", kc.topicName),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))

//There is need to invoke Unassign before Assign or seek twice will fail
//on the same topic and partition.
err := kc.c.Unassign()
if err != nil {
log.Error("kafka consumer unassign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
return err
}

err = kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topicName, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
if err != nil {
log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err))
return err
}

// If seek timeout is not 0 the call twice will return error state RD_KAFKA_RESP_ERR__STATE.
// if the timeout is 0 it will initiate the seek but return immediately without any error reporting
kc.skipMsg = !inclusive
return kc.c.Seek(kafka.TopicPartition{
Topic: &kc.topicName,
Partition: mqwrapper.DefaultPartitionIdx,
Offset: offset}, 0)
}

func (kc *Consumer) Ack(message mqwrapper.Message) {
kc.c.Commit()
}

func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
_, high, err := kc.c.QueryWatermarkOffsets(kc.topicName, mqwrapper.DefaultPartitionIdx, -1)
if err != nil {
return nil, err
}

// Current high value is next offset of the latest message ID, in order to keep
// semantics consistency with the latest message ID, the high value need to move forward.
if high > 0 {
high = high - 1
}

return &kafkaID{messageID: high}, nil
}

func (kc *Consumer) Close() {
log.Debug("close kafka consumer", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID))
kc.closeOnce.Do(func() {
kc.c.Unsubscribe()
kc.c.Close()
close(kc.closeCh)
})
}


Code Block
titlekafka_producer.go
package kafka

import (
	"context"
	"sync"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)

type kafkaProducer struct {
	p            *kafka.Producer
	topic        string
	deliveryChan chan kafka.Event
	closeOnce    sync.Once
}

func (kp *kafkaProducer) Topic() string {
	return kp.topic
}

func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
	err := kp.p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
		Value:          message.Payload,
	}, kp.deliveryChan)

	if err != nil {
		return nil, err
	}

	e := <-kp.deliveryChan
	m := e.(*kafka.Message)
	if m.TopicPartition.Error != nil {
		return nil, m.TopicPartition.Error
	}

	kp.p.Flush(5000)
	return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
}

func (kp *kafkaProducer) Close() {
	kp.closeOnce.Do(func() {
		kp.p.Close()
		close(kp.deliveryChan)
	})
}



Deployments

  • standalone
    • docker
  • Cluster
    • Helm Chart
    • Operator

...