mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 08:46:54 +08:00
* chore(deps): bump gocloud.dev from 0.40.0 to 0.41.0 Bumps [gocloud.dev](https://github.com/google/go-cloud) from 0.40.0 to 0.41.0. - [Release notes](https://github.com/google/go-cloud/releases) - [Commits](https://github.com/google/go-cloud/compare/v0.40.0...v0.41.0) --- updated-dependencies: - dependency-name: gocloud.dev dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * fix error * fix printing errors * Update go.mod --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: chrislu <chris.lu@gmail.com>
93 lines
3.1 KiB
Go
93 lines
3.1 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// SubscriberToSubCoordinator coordinates the subscribers
|
|
func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error {
|
|
if !b.isLockOwner() {
|
|
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
|
}
|
|
req, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var cgi *sub_coordinator.ConsumerGroupInstance
|
|
var cg *sub_coordinator.ConsumerGroup
|
|
// process init message
|
|
initMessage := req.GetInit()
|
|
if initMessage != nil {
|
|
cg, cgi, err = b.SubCoordinator.AddSubscriber(initMessage)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "failed to add subscriber: %v", err)
|
|
}
|
|
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
|
|
} else {
|
|
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
|
|
}
|
|
defer func() {
|
|
b.SubCoordinator.RemoveSubscriber(initMessage)
|
|
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
|
|
}()
|
|
|
|
ctx := stream.Context()
|
|
|
|
go func() {
|
|
// process ack messages
|
|
for {
|
|
req, err := stream.Recv()
|
|
if err != nil {
|
|
glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
|
|
}
|
|
|
|
if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil {
|
|
glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
|
|
cg.AckUnAssignment(cgi, ackUnAssignment)
|
|
}
|
|
if ackAssignment := req.GetAckAssignment(); ackAssignment != nil {
|
|
glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
|
|
cg.AckAssignment(cgi, ackAssignment)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
if err == context.Canceled {
|
|
// Client disconnected
|
|
return
|
|
}
|
|
return
|
|
default:
|
|
// Continue processing the request
|
|
}
|
|
}
|
|
}()
|
|
|
|
// send commands to subscriber
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
if err == context.Canceled {
|
|
// Client disconnected
|
|
return err
|
|
}
|
|
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
|
|
return err
|
|
case message := <-cgi.ResponseChan:
|
|
glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message)
|
|
if err := stream.Send(message); err != nil {
|
|
glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
|
|
}
|
|
}
|
|
}
|
|
}
|