adjust mq.proto

This commit is contained in:
chrislu
2023-09-30 21:47:57 -07:00
parent 02cb799481
commit ebd6f96d35
5 changed files with 299 additions and 375 deletions

View File

@@ -11,8 +11,8 @@ import (
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetConsumer().Topic),
topic.FromPbPartition(req.GetConsumer().Partition))
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetInit().Topic),
topic.FromPbPartition(req.GetInit().Partition))
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
@@ -24,7 +24,7 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
return nil
}
clientName := fmt.Sprintf("%s/%s-%s", req.GetConsumer().ConsumerGroup, req.GetConsumer().ConsumerId, req.GetConsumer().ClientId)
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()

View File

@@ -21,21 +21,21 @@ func (sub *TopicSubscriber) Subscribe() error {
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Consumer: &mq_pb.SubscribeRequest_Consumer{
ConsumerGroup: sub.SubscriberConfig.GroupId,
ConsumerId: sub.SubscriberConfig.GroupInstanceId,
},
Cursor: &mq_pb.SubscribeRequest_Cursor{
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
Message: &mq_pb.SubscribeRequest_Init{
Init: &mq_pb.SubscribeRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.GroupId,
ConsumerId: sub.SubscriberConfig.GroupInstanceId,
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
},
})
if err != nil {