This commit is contained in:
chrislu
2022-07-10 12:11:37 -07:00
parent 8060fdcac5
commit 94b8c42b2c
9 changed files with 97 additions and 183 deletions

View File

@@ -17,7 +17,7 @@ const (
type FilerGroup string
type Filers struct {
filers map[pb.ServerAddress]*ClusterNode
members map[pb.ServerAddress]*ClusterNode
leaders *Leaders
}
type Leaders struct {
@@ -60,7 +60,7 @@ func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool)
filers, found := cluster.filerGroup2filers[filerGroup]
if !found && createIfNotFound {
filers = &Filers{
filers: make(map[pb.ServerAddress]*ClusterNode),
members: make(map[pb.ServerAddress]*ClusterNode),
leaders: &Leaders{},
}
cluster.filerGroup2filers[filerGroup] = filers
@@ -75,11 +75,11 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCente
cluster.filersLock.Lock()
defer cluster.filersLock.Unlock()
filers := cluster.getFilers(filerGroup, true)
if existingNode, found := filers.filers[address]; found {
if existingNode, found := filers.members[address]; found {
existingNode.counter++
return nil
}
filers.filers[address] = &ClusterNode{
filers.members[address] = &ClusterNode{
Address: address,
Version: version,
counter: 1,
@@ -87,7 +87,7 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCente
DataCenter: dataCenter,
Rack: rack,
}
return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
return ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
@@ -151,13 +151,13 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter
if filers == nil {
return nil
}
if existingNode, found := filers.filers[address]; !found {
if existingNode, found := filers.members[address]; !found {
return nil
} else {
existingNode.counter--
if existingNode.counter <= 0 {
delete(filers.filers, address)
return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
delete(filers.members, address)
return ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
}
}
case BrokerType:
@@ -215,7 +215,7 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string)
if filers == nil {
return
}
for _, node := range filers.filers {
for _, node := range filers.members {
nodes = append(nodes, node)
}
case BrokerType:
@@ -241,7 +241,7 @@ func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddr
return filers.leaders.isOneLeader(address)
}
func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
func ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
if isAdd {
if filers.leaders.addLeaderIfVacant(address) {
// has added the address as one leader
@@ -282,7 +282,7 @@ func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGrou
var shortestDuration int64 = math.MaxInt64
now := time.Now()
var candidateAddress pb.ServerAddress
for _, node := range filers.filers {
for _, node := range filers.members {
if filers.leaders.isOneLeader(node.Address) {
continue
}