stop partitionOffsetChan if closed

This commit is contained in:
chrislu
2024-05-30 00:27:44 -07:00
parent af19256dbf
commit 23a72db1df

View File

@@ -67,6 +67,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.CloseSend()
return
case ack := <-partitionOffsetChan:
case ack, ok := <-partitionOffsetChan:
if !ok {
subscribeClient.CloseSend()
return
}
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{