master able to list all master clients by type

This commit is contained in:
Chris Lu
2020-03-01 22:13:47 -08:00
parent 0ca68a2a6d
commit 6a8484b4ae
14 changed files with 353 additions and 206 deletions

View File

@@ -1,8 +1,10 @@
package weed_server
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/chrislusf/raft"
@@ -181,35 +183,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
// remember client address
ctx := stream.Context()
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
glog.Error("failed to get peer from ctx")
return fmt.Errorf("failed to get peer from ctx")
}
if pr.Addr == net.Addr(nil) {
glog.Error("failed to get peer address")
return fmt.Errorf("failed to get peer address")
}
peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
clientName := req.Name + pr.Addr.String()
glog.V(0).Infof("+ client %v", clientName)
messageChan := make(chan *master_pb.VolumeLocation)
stopChan := make(chan bool)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
ms.clientChansLock.Unlock()
clientName, messageChan := ms.addClient(req.Name, peerAddress)
defer func() {
glog.V(0).Infof("- client %v", clientName)
ms.clientChansLock.Lock()
delete(ms.clientChans, clientName)
ms.clientChansLock.Unlock()
}()
defer ms.deleteClient(clientName)
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
@@ -261,3 +241,57 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
}
return nil
}
func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
clientName = clientType + "@" + clientAddress
glog.V(0).Infof("+ client %v", clientName)
messageChan = make(chan *master_pb.VolumeLocation)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
ms.clientChansLock.Unlock()
return
}
func (ms *MasterServer) deleteClient(clientName string) {
glog.V(0).Infof("- client %v", clientName)
ms.clientChansLock.Lock()
delete(ms.clientChans, clientName)
ms.clientChansLock.Unlock()
}
func findClientAddress(ctx context.Context, grpcPort uint32) string {
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
glog.Error("failed to get peer from ctx")
return ""
}
if pr.Addr == net.Addr(nil) {
glog.Error("failed to get peer address")
return ""
}
if grpcPort == 0 {
return pr.Addr.String()
}
if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
externalIP := tcpAddr.IP
return fmt.Sprintf("%s:%d", externalIP, grpcPort)
}
return pr.Addr.String()
}
func (ms *MasterServer ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
resp := &master_pb.ListMasterClientsResponse{}
ms.clientChansLock.RLock()
defer ms.clientChansLock.RUnlock()
for k := range ms.clientChans {
if strings.HasPrefix(k, req.ClientType+"@") {
resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
}
}
return resp, nil
}