mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-15 20:06:19 +08:00
dynamically connect to a filer
This commit is contained in:
@@ -1,72 +0,0 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/cluster"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) checkFilers() {
|
||||
|
||||
// contact a filer about masters
|
||||
var masters []pb.ServerAddress
|
||||
found := false
|
||||
for !found {
|
||||
for _, filer := range broker.option.Filers {
|
||||
err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, m := range resp.Masters {
|
||||
masters = append(masters, pb.ServerAddress(m))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
glog.V(0).Infof("received master list: %s", masters)
|
||||
|
||||
// contact each masters for filers
|
||||
var filers []pb.ServerAddress
|
||||
found = false
|
||||
for !found {
|
||||
for _, master := range masters {
|
||||
err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
||||
ClientType: cluster.FilerType,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, clusterNode := range resp.ClusterNodes {
|
||||
filers = append(filers, pb.ServerAddress(clusterNode.Address))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
glog.V(0).Infof("failed to list filers: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
glog.V(0).Infof("received filer list: %s", filers)
|
||||
|
||||
broker.option.Filers = filers
|
||||
|
||||
}
|
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||
"google.golang.org/grpc"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -16,7 +17,6 @@ type MessageQueueBrokerOption struct {
|
||||
FilerGroup string
|
||||
DataCenter string
|
||||
Rack string
|
||||
Filers []pb.ServerAddress
|
||||
DefaultReplication string
|
||||
MaxMB int
|
||||
Ip string
|
||||
@@ -29,6 +29,8 @@ type MessageQueueBroker struct {
|
||||
option *MessageQueueBrokerOption
|
||||
grpcDialOption grpc.DialOption
|
||||
MasterClient *wdclient.MasterClient
|
||||
filers map[pb.ServerAddress]struct{}
|
||||
currentFiler pb.ServerAddress
|
||||
}
|
||||
|
||||
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
|
||||
@@ -37,15 +39,47 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
option: option,
|
||||
grpcDialOption: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
|
||||
filers: make(map[pb.ServerAddress]struct{}),
|
||||
}
|
||||
|
||||
mqBroker.checkFilers()
|
||||
mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate
|
||||
|
||||
go mqBroker.MasterClient.KeepConnectedToMaster()
|
||||
|
||||
existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
|
||||
for _, newNode := range existingNodes {
|
||||
mqBroker.OnBrokerUpdate(newNode, time.Now())
|
||||
}
|
||||
|
||||
return mqBroker, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
|
||||
if update.NodeType != cluster.FilerType {
|
||||
return
|
||||
}
|
||||
|
||||
address := pb.ServerAddress(update.Address)
|
||||
if update.IsAdd {
|
||||
broker.filers[address] = struct{}{}
|
||||
if broker.currentFiler == "" {
|
||||
broker.currentFiler = address
|
||||
}
|
||||
} else {
|
||||
delete(broker.filers, address)
|
||||
if broker.currentFiler == address {
|
||||
for filer, _ := range broker.filers {
|
||||
broker.currentFiler = filer
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
|
||||
return broker.currentFiler
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
|
||||
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
|
||||
|
Reference in New Issue
Block a user