Files
seaweedfs/weed/mq/broker/broker_grpc_sub.go

264 lines
8.5 KiB
Go
Raw Permalink Normal View History

2023-08-27 13:13:14 -07:00
package broker
import (
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
"context"
2025-02-25 19:54:03 -08:00
"errors"
2023-08-27 17:50:59 -07:00
"fmt"
"io"
"time"
2023-08-27 13:13:14 -07:00
"github.com/seaweedfs/seaweedfs/weed/glog"
2024-05-20 09:33:37 -07:00
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
2023-08-27 13:13:14 -07:00
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
2023-08-27 13:13:14 -07:00
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
2024-01-08 00:03:08 -08:00
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
2023-08-27 13:13:14 -07:00
)
2024-05-16 20:28:19 -07:00
func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
2023-08-27 13:13:14 -07:00
2024-05-16 20:28:19 -07:00
req, err := stream.Recv()
if err != nil {
return err
}
if req.GetInit() == nil {
glog.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
2024-05-16 20:28:19 -07:00
ctx := stream.Context()
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
t := topic.FromPbTopic(req.GetInit().Topic)
2024-01-05 15:24:14 -08:00
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
2024-01-11 23:08:02 -08:00
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
2024-03-24 13:04:59 -07:00
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
return getOrGenErr
2023-08-27 13:13:14 -07:00
}
subscriber := topic.NewLocalSubscriber()
localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber)
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
sleepIntervalCount := 0
2024-01-15 00:20:12 -08:00
var counter int64
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
defer func() {
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
2024-01-15 00:20:12 -08:00
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
2024-04-01 15:02:52 -07:00
b.localTopicManager.RemoveLocalPartition(t, partition)
}
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
}()
startPosition := b.getRequestPosition(req.GetInit())
imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize))
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
if req.GetInit().FollowerBroker != "" {
follower := req.GetInit().FollowerBroker
if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
return fmt.Errorf("fail to dial %s: %v", follower, err)
} else {
defer func() {
println("closing SubscribeFollowMe connection", follower)
if subscribeFollowMeStream != nil {
subscribeFollowMeStream.CloseSend()
}
2024-05-30 09:15:50 -07:00
// followerGrpcConnection.Close()
}()
followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
} else {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Init{
Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
2024-05-20 11:05:18 -07:00
Topic: req.GetInit().Topic,
Partition: req.GetInit().GetPartitionOffset().Partition,
ConsumerGroup: req.GetInit().ConsumerGroup,
},
},
}); err != nil {
return fmt.Errorf("fail to send init to %s: %v", follower, err)
}
}
}
glog.V(0).Infof("follower %s connected", follower)
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
}
2024-05-16 20:28:19 -07:00
go func() {
var lastOffset int64
2024-05-16 20:28:19 -07:00
for {
ack, err := stream.Recv()
if err != nil {
if err == io.EOF {
2024-05-30 09:15:50 -07:00
// the client has called CloseSend(). This is to ack the close.
stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
IsEndOfStream: true,
},
}})
2024-05-16 20:28:19 -07:00
break
}
2024-05-29 23:33:37 -07:00
glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
2024-05-16 20:28:19 -07:00
break
}
2024-05-29 23:33:37 -07:00
if ack.GetAck().Key == nil {
// skip ack for control messages
continue
}
2024-05-20 09:33:37 -07:00
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
2024-05-29 23:33:37 -07:00
currentLastOffset := imt.GetOldestAckedTimestamp()
// Update acknowledged offset and last seen time for this subscriber when it sends an ack
subscriber.UpdateAckedOffset(currentLastOffset)
2024-05-30 09:49:08 -07:00
// fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
2024-05-20 09:33:37 -07:00
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
2024-05-30 09:15:23 -07:00
TsNs: currentLastOffset,
},
},
}); err != nil {
glog.Errorf("Error sending ack to follower: %v", err)
break
}
2024-05-20 09:33:37 -07:00
lastOffset = currentLastOffset
2024-05-30 09:49:08 -07:00
// fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
}
}
if lastOffset > 0 {
2024-05-30 09:15:50 -07:00
glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
2024-05-29 23:33:37 -07:00
glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
}
2024-05-29 23:33:37 -07:00
}
if subscribeFollowMeStream != nil {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Close{
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
},
}); err != nil {
if err != io.EOF {
glog.Errorf("Error sending close to follower: %v", err)
}
}
2024-05-16 20:28:19 -07:00
}
}()
2024-01-15 00:20:12 -08:00
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
if !isConnected {
return false
}
sleepIntervalCount++
2024-03-10 14:34:28 -07:00
if sleepIntervalCount > 32 {
sleepIntervalCount = 32
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
}
2024-03-10 14:34:28 -07:00
time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
2025-02-25 19:54:03 -08:00
if errors.Is(err, context.Canceled) {
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
// Client disconnected
return false
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
}
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
// reset the sleep interval count
sleepIntervalCount = 0
2023-09-01 00:36:51 -07:00
for imt.IsInflight(logEntry.Key) {
time.Sleep(137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return false, nil
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
}
}
2024-05-29 23:33:37 -07:00
if logEntry.Key != nil {
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
}
2024-05-20 12:27:45 -07:00
2024-01-05 15:14:25 -08:00
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
2023-08-27 13:13:14 -07:00
Data: &mq_pb.DataMessage{
2024-03-07 10:53:30 -08:00
Key: logEntry.Key,
Value: logEntry.Data,
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
2023-12-11 12:05:54 -08:00
TsNs: logEntry.TsNs,
2023-08-27 13:13:14 -07:00
},
2024-03-10 14:34:28 -07:00
}}); err != nil {
glog.Errorf("Error sending data: %v", err)
return false, err
}
// Update received offset and last seen time for this subscriber
subscriber.UpdateReceivedOffset(logEntry.TsNs)
2024-03-10 14:34:28 -07:00
counter++
return false, nil
})
}
func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
if initMessage == nil {
return
}
offset := initMessage.GetPartitionOffset()
offsetType := initMessage.OffsetType
// reset to earliest or latest
if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
return
}
if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
return
}
// use the exact timestamp
if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
2024-03-10 14:34:28 -07:00
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return
}
// try to resume
2024-05-20 11:05:18 -07:00
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
2024-05-30 09:15:50 -07:00
glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
2024-03-10 14:34:28 -07:00
}
if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -5)
} else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6)
2024-03-10 14:34:28 -07:00
}
return
}