mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-15 20:06:19 +08:00
remove isForPublish from LookupTopicBrokers
also adds a return parameter: whether the topic exists or not
This commit is contained in:
@@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
|
||||
}
|
||||
|
||||
ret := &mq_pb.ConfigureTopicResponse{}
|
||||
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
|
||||
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
|
||||
|
||||
for _, bpa := range ret.BrokerPartitionAssignments {
|
||||
fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)
|
||||
|
@@ -35,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
|
||||
|
||||
ret := &mq_pb.LookupTopicBrokersResponse{}
|
||||
ret.Topic = request.Topic
|
||||
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, -1)
|
||||
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, false, -1)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
|
@@ -38,7 +38,6 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
|
||||
Namespace: p.namespace,
|
||||
Name: p.topic,
|
||||
},
|
||||
IsForPublish: true,
|
||||
})
|
||||
glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
|
||||
if p.config.CreateTopic && err != nil {
|
||||
@@ -58,7 +57,6 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
|
||||
Namespace: p.namespace,
|
||||
Name: p.topic,
|
||||
},
|
||||
IsForPublish: true,
|
||||
})
|
||||
glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
|
||||
}
|
||||
|
@@ -10,7 +10,7 @@ var (
|
||||
ErrNoBroker = errors.New("no broker")
|
||||
)
|
||||
|
||||
func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
|
||||
func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) {
|
||||
if partitionCount == 0 {
|
||||
partitionCount = 6
|
||||
}
|
||||
@@ -37,7 +37,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
|
||||
}
|
||||
if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
|
||||
glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments)
|
||||
return assignments, nil
|
||||
return assignments, true, nil
|
||||
}
|
||||
|
||||
// find the topic partitions on the filer
|
||||
@@ -48,7 +48,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
|
||||
// return error not found
|
||||
// t := topic.FromPbTopic(request.Topic)
|
||||
if balancer.Brokers.IsEmpty() {
|
||||
return nil, ErrNoBroker
|
||||
return nil, alreadyExists, ErrNoBroker
|
||||
}
|
||||
assignments = AllocateTopicPartitions(balancer.Brokers, partitionCount)
|
||||
balancer.OnPartitionChange(topic, assignments)
|
||||
|
Reference in New Issue
Block a user