mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-02-09 09:17:28 +08:00
refactoring
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MasterClient struct {
|
type MasterClient struct {
|
||||||
name string
|
clientType string
|
||||||
grpcPort uint32
|
grpcPort uint32
|
||||||
currentMaster string
|
currentMaster string
|
||||||
masters []string
|
masters []string
|
||||||
@@ -22,9 +22,9 @@ type MasterClient struct {
|
|||||||
vidMap
|
vidMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMasterClient(grpcDialOption grpc.DialOption, clientName string, clientGrpcPort uint32, masters []string) *MasterClient {
|
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientGrpcPort uint32, masters []string) *MasterClient {
|
||||||
return &MasterClient{
|
return &MasterClient{
|
||||||
name: clientName,
|
clientType: clientType,
|
||||||
grpcPort: clientGrpcPort,
|
grpcPort: clientGrpcPort,
|
||||||
masters: masters,
|
masters: masters,
|
||||||
grpcDialOption: grpcDialOption,
|
grpcDialOption: grpcDialOption,
|
||||||
@@ -43,7 +43,7 @@ func (mc *MasterClient) WaitUntilConnected() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MasterClient) KeepConnectedToMaster() {
|
func (mc *MasterClient) KeepConnectedToMaster() {
|
||||||
glog.V(1).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
|
glog.V(1).Infof("%s bootstraps with masters %v", mc.clientType, mc.masters)
|
||||||
for {
|
for {
|
||||||
mc.tryAllMasters()
|
mc.tryAllMasters()
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
@@ -65,27 +65,27 @@ func (mc *MasterClient) tryAllMasters() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
|
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
|
||||||
glog.V(1).Infof("%s Connecting to master %v", mc.name, master)
|
glog.V(1).Infof("%s Connecting to master %v", mc.clientType, master)
|
||||||
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
stream, err := client.KeepConnected(context.Background())
|
stream, err := client.KeepConnected(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.name, master, err)
|
glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.name, GrpcPort: mc.grpcPort}); err != nil {
|
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil {
|
||||||
glog.V(0).Infof("%s failed to send to %s: %v", mc.name, master, err)
|
glog.V(0).Infof("%s failed to send to %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("%s Connected to %v", mc.name, master)
|
glog.V(1).Infof("%s Connected to %v", mc.clientType, master)
|
||||||
mc.currentMaster = master
|
mc.currentMaster = master
|
||||||
|
|
||||||
for {
|
for {
|
||||||
volumeLocation, err := stream.Recv()
|
volumeLocation, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s failed to receive from %s: %v", mc.name, master, err)
|
glog.V(0).Infof("%s failed to receive from %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,18 +102,18 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
|
|||||||
PublicUrl: volumeLocation.PublicUrl,
|
PublicUrl: volumeLocation.PublicUrl,
|
||||||
}
|
}
|
||||||
for _, newVid := range volumeLocation.NewVids {
|
for _, newVid := range volumeLocation.NewVids {
|
||||||
glog.V(1).Infof("%s: %s adds volume %d", mc.name, loc.Url, newVid)
|
glog.V(1).Infof("%s: %s adds volume %d", mc.clientType, loc.Url, newVid)
|
||||||
mc.addLocation(newVid, loc)
|
mc.addLocation(newVid, loc)
|
||||||
}
|
}
|
||||||
for _, deletedVid := range volumeLocation.DeletedVids {
|
for _, deletedVid := range volumeLocation.DeletedVids {
|
||||||
glog.V(1).Infof("%s: %s removes volume %d", mc.name, loc.Url, deletedVid)
|
glog.V(1).Infof("%s: %s removes volume %d", mc.clientType, loc.Url, deletedVid)
|
||||||
mc.deleteLocation(deletedVid, loc)
|
mc.deleteLocation(deletedVid, loc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
if gprcErr != nil {
|
if gprcErr != nil {
|
||||||
glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr)
|
glog.V(0).Infof("%s failed to connect with master %v: %v", mc.clientType, master, gprcErr)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user