...
Milvus supports Kafka as a message stream, we can use the configuration option to decide to use Pulsar or Kafka on cluster mode. We provide the function KafkaEnable() to use Kafka. If you don't want to use kafka, you need to comment out the configuration. Same for Pulsar and Rocksmq. If the configuration of pulsar, kafka and rocksmq are readable. then use rocksmq in standalone mode and pulsar in cluster.
Code Block | ||||
---|---|---|---|---|
| ||||
plusarkafka: address: localhost port: 6650 maxMessageSize: 5242880 #kafka: # brokers: # - localhost:9092 # port: 9092 |
Design Details
- add kafka and zookepper dev docker
- optimization mq_factory configuration initialization
- remove reader
- implement msg_stream with kafka
Configuration
Code Block | ||||
---|---|---|---|---|
| ||||
version: '3.5'
services:
zookeeper:
image: 'bitnami/zookeeper:3.6.3'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:3.1.0'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=0
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_MAX_PARTITION_FETCH_BYTES=5242880
- KAFKA_CFG_MAX_REQUEST_SIZE=5242880
- KAFKA_CFG_MESSAGE_MAX_BYTES=5242880
- KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=5242880
- KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5242880
depends_on:
- zookeeper
networks:
default:
name: milvus_dev |
Kafka Client SDK
- Sarama
- confluent-kafka-go
Configuration
If we want to enable Kafka, need to config it in milvus.yaml file, Pulsar is also. when Kafka and Pulsar config at the same time, Pulsar has higher priority use.
Kafka Client SDK
We tried using sarama and confluent-kafka-go in our development and found that there was basically no difference in the producer. But there is a big difference when using consumer groupgroups.
Sarama use uses consumer group need needs to implement Sarama interfaceinterfaces. It make it is very diffcult difficult to control control and hard to seek.
confulent-kafka-go use consumer group to consume messages just a function. It is very simple to use. This function allows you to directly set the offset from which to start consumption.
Interface Implementation
Note: in order to provide a unified MQ interface, We will remove a series of Reader API from MsgSream interface, actually Kafka does not support Reader API and it also is implemented by Consumer API.
Implement Kafka consumer based on the following interface:
Code Block | ||
---|---|---|
| ||
package kafka import ( "sync" "time" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "go.uber.org/zap" ) type Consumer struct { c *kafka.Consumer msgChannel chan mqwrapper.Message closeFlag bool skipMsg bool topicName string groupID string closeCh chan struct{} chanOnce sync.Once closeOnce sync.Once wg sync.WaitGroup } func newKafkaConsumer(consumer *kafka.Consumer, topicName string, groupID string) *Consumer { msgChannel := make(chan mqwrapper.Message, 1) closeCh := make(chan struct{}) skipMsg := false kafkaConsumer := &Consumer{c: consumer, msgChannel: msgChannel, skipMsg: skipMsg, topicName: topicName, groupID: groupID, closeCh: closeCh, } return kafkaConsumer } func (kc *Consumer) Subscription() string { return kc.groupID } // Chan provides a channel to read consumed message. // There are some illustrations need to clarify, // 1.confluent-kafka-go recommend us to use function-based consumer, // channel-based consumer API had already deprecated, see more details // https://github.com/confluentinc/confluent-kafka-go. // // 2.This API of other MQ return channel directly, but it depends on // readMessage firstly which means it be always triggered within select-case // invocation. However, it still works well, because it covers all messages // consume situation: start from the earliest or latest position to keep consume; // start from a seek position to specified end position. func (kc *Consumer) Chan() <-chan mqwrapper.Message { if kc.skipMsg { msg := kc.readMessage() if msg != nil { kc.skipMsg = false } } msg := kc.readMessage() if msg != nil { kc.msgChannel <- &kafkaMessage{msg: msg} } else { kc.msgChannel <- nil } return kc.msgChannel } func (kc *Consumer) readMessage() *kafka.Message { msg, err := kc.c.ReadMessage(1 * time.Second) if err != nil { if err.(kafka.Error).Code() != kafka.ErrTimedOut { log.Error("read msg failed", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID), zap.Error(err)) } return nil } return msg } func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { offset := kafka.Offset(id.(*kafkaID).messageID) log.Debug("kafka consumer seek ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) //There is need to invoke Unassign before Assign or seek twice will fail //on the same topic and partition. err := kc.c.Unassign() if err != nil { log.Error("kafka consumer unassign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err)) return err } err = kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topicName, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) if err != nil { log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topicName), zap.Any("Msg offset", offset), zap.Error(err)) return err } // If seek timeout is not 0 the call twice will return error state RD_KAFKA_RESP_ERR__STATE. // if the timeout is 0 it will initiate the seek but return immediately without any error reporting kc.skipMsg = !inclusive return kc.c.Seek(kafka.TopicPartition{ Topic: &kc.topicName, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}, 0) } func (kc *Consumer) Ack(message mqwrapper.Message) { kc.c.Commit() } func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) { _, high, err := kc.c.QueryWatermarkOffsets(kc.topicName, mqwrapper.DefaultPartitionIdx, -1) if err != nil { return nil, err } // Current high value is next offset of the latest message ID, in order to keep // semantics consistency with the latest message ID, the high value need to move forward. if high > 0 { high = high - 1 } return &kafkaID{messageID: high}, nil } func (kc *Consumer) Close() { log.Debug("close kafka consumer", zap.Any("topic", kc.topicName), zap.String("groupID", kc.groupID)) kc.closeOnce.Do(func() { kc.c.Unsubscribe() kc.c.Close() close(kc.closeCh) }) } |
Code Block | ||
---|---|---|
| ||
package kafka
import (
"context"
"sync"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)
type kafkaProducer struct {
p *kafka.Producer
topic string
deliveryChan chan kafka.Event
closeOnce sync.Once
}
func (kp *kafkaProducer) Topic() string {
return kp.topic
}
func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
}, kp.deliveryChan)
if err != nil {
return nil, err
}
e := <-kp.deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return nil, m.TopicPartition.Error
}
kp.p.Flush(5000)
return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
}
func (kp *kafkaProducer) Close() {
kp.closeOnce.Do(func() {
kp.p.Close()
close(kp.deliveryChan)
})
} |
Deployments
...
- docker
is the interface that provides operations of a consumer
type Consumer interface {
// returns the subscription for the consumer
Subscription() string
// Message channel
Chan() <-chan Message
// Seek to the uniqueID position
Seek(MessageID, bool) error //nolint:govet
// Ack make sure that msg is received
Ack(Message)
// Close consumer
Close()
// GetLatestMsgID return the latest message ID
GetLatestMsgID() (MessageID, error)
}
|
Implement Kafka producer based on the following interface:
Code Block | ||
---|---|---|
| ||
// Producer is the interface that provides operations of producer
type Producer interface {
// return the topic which producer is publishing to
//Topic() string
// publish a message
Send(ctx context.Context, message *ProducerMessage) (MessageID, error)
Close()
} |
on the other hand, we must provide a configurable ability for MQ clients, so SetParams API will be replaced by Init API.
Code Block | ||
---|---|---|
| ||
type Factory interface {
#SetParams(params map[string]interface{}) error
Init(params *paramtable.ComponentParam) error
} |
Deployments
- standalone
docker
Code Block language yml title dev/docker-compose.yml version: '3.5' services: zookeeper: image: 'bitnami/zookeeper:3.6.3' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:3.1.0' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=0 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_MAX_PARTITION_FETCH_BYTES=5242880 - KAFKA_CFG_MAX_REQUEST_SIZE=5242880 - KAFKA_CFG_MESSAGE_MAX_BYTES=5242880 - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=5242880 - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5242880 depends_on: - zookeeper networks: default: name: milvus_dev
- Cluster
- Helm Chart
- Operator
Test Plan
- pass the unittestut
- performance testing
- chaos testing