auto updated filer peer list

This commit is contained in:
Chris Lu
2021-11-06 14:23:35 -07:00
parent 84bb8e7365
commit e0fc2898e9
8 changed files with 211 additions and 137 deletions

View File

@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
@@ -18,9 +19,13 @@ import (
)
type MetaAggregator struct {
filers []pb.ServerAddress
grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer
filer *Filer
self pb.ServerAddress
isLeader bool
grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer
peerStatues map[pb.ServerAddress]struct{}
peerStatuesLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@@ -28,10 +33,12 @@ type MetaAggregator struct {
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
// The old data comes from what each LocalMetadata persisted on disk.
func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{
filers: filers,
filer: filer,
self: self,
grpcDialOption: grpcDialOption,
peerStatues: make(map[pb.ServerAddress]struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@@ -40,10 +47,35 @@ func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption
return t
}
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) {
for _, filer := range ma.filers {
go ma.subscribeToOneFiler(f, self, filer)
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
if update.NodeType != "filer" {
return
}
address := pb.ServerAddress(update.Address)
if update.IsAdd {
// every filer should subscribe to a new filer
ma.setActive(address, true)
go ma.subscribeToOneFiler(ma.filer, ma.self, address)
} else {
ma.setActive(address, false)
}
}
func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
if isActive {
ma.peerStatues[address] = struct{}{}
} else {
delete(ma.peerStatues, address)
}
}
func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
_, isActive = ma.peerStatues[address]
return
}
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
@@ -149,6 +181,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
}
})
if !ma.isActive(peer) {
glog.V(0).Infof("stop subscribing remote %s meta change", peer)
return
}
if err != nil {
glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
time.Sleep(1733 * time.Millisecond)