diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index be29efa1c..5a134b3c2 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -24,7 +24,6 @@ type PublishClient struct { type TopicPublisher struct { namespace string topic string - partition2Broker *interval.SearchTree[*PublishClient, int32] partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32] grpcDialOption grpc.DialOption sync.Mutex // protects grpc @@ -36,9 +35,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) return &TopicPublisher{ namespace: namespace, topic: topic, - partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int { - return int(a - b) - }), partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { return int(a - b) }), @@ -49,11 +45,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) func (p *TopicPublisher) Shutdown() error { - if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found { - for _, client := range clients { - client.CloseSend() - } - } if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { for _, inputBuffer := range inputBuffers { inputBuffer.CloseInput()