big refactoring

This commit is contained in:
Chris Lu
2019-06-23 03:08:27 -07:00
parent 6c01fb6d2d
commit f16375621f
7 changed files with 144 additions and 130 deletions

View File

@@ -70,7 +70,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
return err
}
@@ -157,8 +157,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if err := stream.Send(&master_pb.HeartbeatResponse{
Leader: newLeader,
MetricsAddress: ms.metricsAddress,
MetricsIntervalSeconds: uint32(ms.metricsIntervalSec),
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
}); err != nil {
return err
}

View File

@@ -50,7 +50,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if req.Replication == "" {
req.Replication = ms.defaultReplicaPlacement
req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
@@ -65,7 +65,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
Prealloacte: ms.preallocate,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
DataNode: req.DataNode,
@@ -105,7 +105,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
}
if req.Replication == "" {
req.Replication = ms.defaultReplicaPlacement
req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
@@ -136,7 +136,7 @@ func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeLis
resp := &master_pb.VolumeListResponse{
TopologyInfo: ms.Topo.ToTopologyInfo(),
VolumeSizeLimitMb: uint64(ms.volumeSizeLimitMB),
VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
}
return resp, nil

View File

@@ -25,17 +25,25 @@ import (
"github.com/spf13/viper"
)
type MasterOption struct {
Port int
MetaFolder string
VolumeSizeLimitMB uint
VolumePreallocate bool
PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
WhiteList []string
DisableHttp bool
MetricsAddress string
MetricsIntervalSec int
}
type MasterServer struct {
port int
metaFolder string
volumeSizeLimitMB uint
preallocate int64
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold float64
guard *security.Guard
metricsAddress string
metricsIntervalSec int
option *MasterOption
guard *security.Guard
preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -50,17 +58,7 @@ type MasterServer struct {
grpcDialOpiton grpc.DialOption
}
func NewMasterServer(r *mux.Router, port int, metaFolder string,
volumeSizeLimitMB uint,
preallocate bool,
pulseSeconds int,
defaultReplicaPlacement string,
garbageThreshold float64,
whiteList []string,
disableHttp bool,
metricsAddress string,
metricsIntervalSec int,
) *MasterServer {
func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -72,30 +70,24 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
var preallocateSize int64
if preallocate {
preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
ms := &MasterServer{
port: port,
volumeSizeLimitMB: volumeSizeLimitMB,
preallocate: preallocateSize,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
metricsAddress: metricsAddress,
metricsIntervalSec: metricsIntervalSec,
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
ms.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
if !disableHttp {
if !ms.option.DisableHttp {
handleStaticResources2(r)
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
@@ -113,7 +105,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
}
ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate)
ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
ms.startAdminScripts()
@@ -185,7 +177,7 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines := strings.Split(adminScripts, "\n")
masterAddress := "localhost:" + strconv.Itoa(ms.port)
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")

View File

@@ -47,7 +47,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
gcString := r.FormValue("garbageThreshold")
gcThreshold := ms.garbageThreshold
gcThreshold := ms.option.GarbageThreshold
if gcString != "" {
var err error
gcThreshold, err = strconv.ParseFloat(gcString, 32)
@@ -57,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate)
ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -119,7 +119,7 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
if r.Host != "" {
return r.Host
}
return "localhost:" + strconv.Itoa(ms.port)
return "localhost:" + strconv.Itoa(ms.option.Port)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
@@ -142,7 +142,7 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
replicationString = ms.defaultReplicaPlacement
replicationString = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
if err != nil {
@@ -152,7 +152,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
preallocate := ms.preallocate
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {