mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 18:58:50 +08:00
broadcast messages of new and deleted volumes
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/topology"
|
||||
"google.golang.org/grpc/peer"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
|
||||
@@ -16,8 +17,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||
|
||||
defer func() {
|
||||
if dn != nil {
|
||||
|
||||
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
|
||||
t.UnRegisterDataNode(dn)
|
||||
|
||||
message := &master_pb.VolumeLocation{
|
||||
Url: dn.Url(),
|
||||
PublicUrl: dn.PublicUrl,
|
||||
}
|
||||
for _, v := range dn.GetVolumes() {
|
||||
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
|
||||
}
|
||||
|
||||
if len(message.DeletedVids) > 0 {
|
||||
ms.clientChansLock.RLock()
|
||||
for _, ch := range ms.clientChans {
|
||||
ch <- message
|
||||
}
|
||||
ms.clientChansLock.RUnlock()
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -49,7 +68,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||
}
|
||||
}
|
||||
|
||||
t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
||||
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
||||
|
||||
message := &master_pb.VolumeLocation{
|
||||
Url: dn.Url(),
|
||||
PublicUrl: dn.PublicUrl,
|
||||
}
|
||||
for _, v := range newVolumes {
|
||||
message.NewVids = append(message.NewVids, uint32(v.Id))
|
||||
}
|
||||
for _, v := range deletedVolumes {
|
||||
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
|
||||
}
|
||||
|
||||
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
|
||||
ms.clientChansLock.RLock()
|
||||
for _, ch := range ms.clientChans {
|
||||
ch <- message
|
||||
}
|
||||
ms.clientChansLock.RUnlock()
|
||||
}
|
||||
|
||||
} else {
|
||||
return err
|
||||
@@ -69,13 +107,63 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||
|
||||
// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
|
||||
func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// remember client address
|
||||
ctx := stream.Context()
|
||||
// fmt.Printf("FromContext %+v\n", ctx)
|
||||
pr, ok := peer.FromContext(ctx)
|
||||
if !ok {
|
||||
glog.Error("failed to get peer from ctx")
|
||||
return fmt.Errorf("failed to get peer from ctx")
|
||||
}
|
||||
if pr.Addr == net.Addr(nil) {
|
||||
glog.Error("failed to get peer address")
|
||||
return fmt.Errorf("failed to get peer address")
|
||||
}
|
||||
|
||||
clientName := req.Name + pr.Addr.String()
|
||||
glog.V(0).Infof("+ client %v", clientName)
|
||||
|
||||
messageChan := make(chan *master_pb.VolumeLocation)
|
||||
stopChan := make(chan bool)
|
||||
|
||||
ms.clientChansLock.Lock()
|
||||
ms.clientChans[clientName] = messageChan
|
||||
ms.clientChansLock.Unlock()
|
||||
|
||||
defer func() {
|
||||
glog.V(0).Infof("- client %v", clientName)
|
||||
ms.clientChansLock.Lock()
|
||||
delete(ms.clientChans, clientName)
|
||||
ms.clientChansLock.Unlock()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err != nil {
|
||||
glog.V(2).Infof("- client %v: %v", clientName, err)
|
||||
stopChan <- true
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := stream.Send(&master_pb.Empty{}); err != nil {
|
||||
return err
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case message := <-messageChan:
|
||||
if err := stream.Send(message); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-stopChan:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/topology"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
|
||||
type MasterServer struct {
|
||||
@@ -31,6 +32,10 @@ type MasterServer struct {
|
||||
vgLock sync.Mutex
|
||||
|
||||
bounedLeaderChan chan int
|
||||
|
||||
// notifying clients
|
||||
clientChansLock sync.RWMutex
|
||||
clientChans map[string]chan *master_pb.VolumeLocation
|
||||
}
|
||||
|
||||
func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
||||
@@ -54,6 +59,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
||||
pulseSeconds: pulseSeconds,
|
||||
defaultReplicaPlacement: defaultReplicaPlacement,
|
||||
garbageThreshold: garbageThreshold,
|
||||
clientChans: make(map[string]chan *master_pb.VolumeLocation),
|
||||
}
|
||||
ms.bounedLeaderChan = make(chan int, 16)
|
||||
seq := sequence.NewMemorySequencer()
|
||||
|
Reference in New Issue
Block a user