This commit is contained in:
chrislu
2024-05-20 11:05:18 -07:00
parent 5038577f7e
commit 3622196881
15 changed files with 58 additions and 56 deletions

View File

@@ -75,8 +75,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Init{ Message: &mq_pb.SubscribeFollowMeRequest_Init{
Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{ Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
Topic: req.GetInit().Topic, Topic: req.GetInit().Topic,
Partition: req.GetInit().GetPartitionOffset().Partition, Partition: req.GetInit().GetPartitionOffset().Partition,
ConsumerGroup: req.GetInit().ConsumerGroup, ConsumerGroup: req.GetInit().ConsumerGroup,
}, },
}, },
@@ -186,7 +186,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return return
} }
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil{ if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
startPosition = log_buffer.NewMessagePosition(storedOffset, -2) startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return return
} }

View File

@@ -12,7 +12,6 @@ import (
"time" "time"
) )
func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) { func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
var req *mq_pb.SubscribeFollowMeRequest var req *mq_pb.SubscribeFollowMeRequest
req, err = stream.Recv() req, err = stream.Recv()
@@ -72,7 +71,7 @@ func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.Subscrib
err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName) data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
if err != nil { if err != nil {
return err return err
} }
if len(data) != 8 { if len(data) != 8 {
return fmt.Errorf("no offset found") return fmt.Errorf("no offset found")

View File

@@ -47,7 +47,7 @@ type MessageQueueBroker struct {
lockAsBalancer *cluster.LiveLock lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.Coordinator Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor fca *sub_coordinator.FilerClientAccessor
} }
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -65,7 +65,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
Coordinator: coordinator, Coordinator: coordinator,
} }
fca := &sub_coordinator.FilerClientAccessor{ fca := &sub_coordinator.FilerClientAccessor{
GetFiler: mqBroker.GetFiler, GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption, GetGrpcDialOption: mqBroker.GetGrpcDialOption,
} }
mqBroker.fca = fca mqBroker.fca = fca
@@ -130,7 +130,6 @@ func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption {
return b.grpcDialOption return b.grpcDialOption
} }
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress { func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
return b.currentFiler return b.currentFiler
} }

View File

@@ -44,7 +44,7 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0 counter := 0
subscriber.SetEachMessageFunc(func(key, value []byte) (error) { subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++ counter++
println(string(key), "=>", string(value), counter) println(string(key), "=>", string(value), counter)
return nil return nil

View File

@@ -63,7 +63,7 @@ func main() {
} }
processorConfig := sub_client.ProcessorConfiguration{ processorConfig := sub_client.ProcessorConfiguration{
MaxPartitionCount: 3, MaxPartitionCount: 3,
PerPartitionConcurrency: 1, PerPartitionConcurrency: 1,
} }
@@ -71,7 +71,7 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0 counter := 0
subscriber.SetEachMessageFunc(func(key, value []byte) (error) { subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++ counter++
record := &schema_pb.RecordValue{} record := &schema_pb.RecordValue{}
proto.Unmarshal(value, record) proto.Unmarshal(value, record)

View File

@@ -142,11 +142,11 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err = publishClient.Send(&mq_pb.PublishMessageRequest{ if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{ Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{ Init: &mq_pb.PublishMessageRequest_InitMessage{
Topic: p.config.Topic.ToPbTopic(), Topic: p.config.Topic.ToPbTopic(),
Partition: job.Partition, Partition: job.Partition,
AckInterval: 128, AckInterval: 128,
FollowerBroker: job.FollowerBroker, FollowerBroker: job.FollowerBroker,
PublisherName: p.config.PublisherName, PublisherName: p.config.PublisherName,
}, },
}, },
}); err != nil { }); err != nil {

View File

@@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(), Topic: sub.ContentConfig.Topic.ToPbTopic(),
MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount, MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount,
}, },
}, },
}); err != nil { }); err != nil {
@@ -105,12 +105,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
Partition: assigned.Partition, Partition: assigned.Partition,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
}, },
Filter: sub.ContentConfig.Filter, Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker, FollowerBroker: assigned.FollowerBroker,
Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
}, },
}, },
});err != nil { }); err != nil {
glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
} }
@@ -120,16 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
defer sub.OnCompletionFunc() defer sub.OnCompletionFunc()
} }
partitionOffsetChan:= make(chan int64, 1024) partitionOffsetChan := make(chan int64, 1024)
defer func() { defer func() {
close(partitionOffsetChan) close(partitionOffsetChan)
}() }()
concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount) perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency)
if concurrentPartitionLimit <= 0 { if perPartitionConcurrency <= 0 {
concurrentPartitionLimit = 1 perPartitionConcurrency = 1
} }
executors := util.NewLimitedConcurrentExecutor(concurrentPartitionLimit) executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency)
go func() { go func() {
for ack := range partitionOffsetChan { for ack := range partitionOffsetChan {
@@ -162,7 +162,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil { if processErr == nil {
partitionOffsetChan <- m.Data.TsNs partitionOffsetChan <- m.Data.TsNs
}else{ } else {
lastErr = processErr lastErr = processErr
} }
}) })

View File

@@ -9,7 +9,6 @@ import (
) )
type ProcessorState struct { type ProcessorState struct {
} }
// Subscribe subscribes to a topic's specified partitions. // Subscribe subscribes to a topic's specified partitions.

View File

@@ -22,7 +22,7 @@ type ContentConfiguration struct {
} }
type ProcessorConfiguration struct { type ProcessorConfiguration struct {
MaxPartitionCount int32 // how many partitions to process concurrently MaxPartitionCount int32 // how many partitions to process concurrently
PerPartitionConcurrency int32 // how many messages to process concurrently per partition PerPartitionConcurrency int32 // how many messages to process concurrently per partition
} }
@@ -30,16 +30,16 @@ type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func() type OnCompletionFunc func()
type TopicSubscriber struct { type TopicSubscriber struct {
SubscriberConfig *SubscriberConfiguration SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration ContentConfig *ContentConfiguration
ProcessorConfig *ProcessorConfiguration ProcessorConfig *ProcessorConfiguration
brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string bootstrapBrokers []string
waitForMoreMessage bool waitForMoreMessage bool
activeProcessors map[topic.Partition]*ProcessorState activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex activeProcessorsLock sync.Mutex
} }
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {

View File

@@ -12,8 +12,8 @@ import (
type ConsumerGroupInstance struct { type ConsumerGroupInstance struct {
InstanceId string InstanceId string
// the consumer group instance may not have an active partition // the consumer group instance may not have an active partition
Partitions []*topic.Partition Partitions []*topic.Partition
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
MaxPartitionCount int32 MaxPartitionCount int32
} }
type ConsumerGroup struct { type ConsumerGroup struct {
@@ -43,10 +43,10 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
} }
} }
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) { func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(true, "add consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds) cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
} }
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) { func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(false, "remove consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds) cg.onConsumerGroupInstanceChange(false, "remove consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
} }
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) { func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) {

View File

@@ -17,8 +17,8 @@ type TopicConsumerGroups struct {
type Coordinator struct { type Coordinator struct {
// map topic name to consumer groups // map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.Balancer balancer *pub_balancer.Balancer
FilerClientAccessor *FilerClientAccessor FilerClientAccessor *FilerClientAccessor
} }

View File

@@ -14,8 +14,8 @@ import (
) )
type FilerClientAccessor struct { type FilerClientAccessor struct {
GetFiler func() pb.ServerAddress GetFiler func() pb.ServerAddress
GetGrpcDialOption func()grpc.DialOption GetGrpcDialOption func() grpc.DialOption
} }
func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {

View File

@@ -6,15 +6,15 @@ import (
) )
type InflightMessageTracker struct { type InflightMessageTracker struct {
messages map[string]int64 messages map[string]int64
mu sync.Mutex mu sync.Mutex
timestamps *RingBuffer timestamps *RingBuffer
} }
func NewInflightMessageTracker(capacity int) *InflightMessageTracker { func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
return &InflightMessageTracker{ return &InflightMessageTracker{
messages: make(map[string]int64), messages: make(map[string]int64),
timestamps: NewRingBuffer(capacity), timestamps: NewRingBuffer(capacity),
} }
} }
@@ -26,6 +26,7 @@ func (imt *InflightMessageTracker) InflightMessage(key []byte, tsNs int64) {
imt.messages[string(key)] = tsNs imt.messages[string(key)] = tsNs
imt.timestamps.Add(tsNs) imt.timestamps.Add(tsNs)
} }
// IsMessageAcknowledged returns true if the message has been acknowledged. // IsMessageAcknowledged returns true if the message has been acknowledged.
// If the message is older than the oldest inflight messages, returns false. // If the message is older than the oldest inflight messages, returns false.
// returns false if the message is inflight. // returns false if the message is inflight.
@@ -47,6 +48,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64)
return true return true
} }
// AcknowledgeMessage acknowledges the message with the key and timestamp. // AcknowledgeMessage acknowledges the message with the key and timestamp.
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool { func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
imt.mu.Lock() imt.mu.Lock()
@@ -71,12 +73,14 @@ type RingBuffer struct {
head int head int
size int size int
} }
// NewRingBuffer creates a new RingBuffer of the given capacity. // NewRingBuffer creates a new RingBuffer of the given capacity.
func NewRingBuffer(capacity int) *RingBuffer { func NewRingBuffer(capacity int) *RingBuffer {
return &RingBuffer{ return &RingBuffer{
buffer: make([]int64, capacity), buffer: make([]int64, capacity),
} }
} }
// Add adds a new timestamp to the ring buffer. // Add adds a new timestamp to the ring buffer.
func (rb *RingBuffer) Add(timestamp int64) { func (rb *RingBuffer) Add(timestamp int64) {
rb.buffer[rb.head] = timestamp rb.buffer[rb.head] = timestamp
@@ -85,6 +89,7 @@ func (rb *RingBuffer) Add(timestamp int64) {
rb.size++ rb.size++
} }
} }
// Remove removes the specified timestamp from the ring buffer. // Remove removes the specified timestamp from the ring buffer.
func (rb *RingBuffer) Remove(timestamp int64) { func (rb *RingBuffer) Remove(timestamp int64) {
// Perform binary search // Perform binary search

View File

@@ -82,10 +82,10 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions)) newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
for _, partition := range partitions { for _, partition := range partitions {
newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{ newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
RangeStart: partition.RangeStart, RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop, RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs, UnixTimeNs: partition.UnixTimeNs,
Broker: partition.AssignedBroker, Broker: partition.AssignedBroker,
FollowerBroker: partition.FollowerBroker, FollowerBroker: partition.FollowerBroker,
}) })
} }

View File

@@ -32,7 +32,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1, MaxPartitionCount: 1,
}, },
}, },
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@@ -61,7 +61,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1, MaxPartitionCount: 1,
}, },
}, },
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@@ -90,7 +90,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1, MaxPartitionCount: 1,
}, },
}, },
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@@ -128,7 +128,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1, MaxPartitionCount: 1,
}, },
}, },
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{ {