mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-22 06:37:24 +08:00
Merge pull request #1145 from stlpmo-jn/volume_graceful_stop
let volume server graceful stop
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
@@ -10,7 +11,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
|
||||
"github.com/spf13/viper"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
@@ -94,7 +97,7 @@ func runVolume(cmd *Command, args []string) bool {
|
||||
|
||||
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
|
||||
|
||||
//Set multiple folders and each folder's max volume count limit'
|
||||
// Set multiple folders and each folder's max volume count limit'
|
||||
v.folders = strings.Split(volumeFolders, ",")
|
||||
maxCountStrings := strings.Split(maxVolumeCounts, ",")
|
||||
for _, maxString := range maxCountStrings {
|
||||
@@ -113,7 +116,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||
}
|
||||
}
|
||||
|
||||
//security related white list configuration
|
||||
// security related white list configuration
|
||||
if volumeWhiteListOption != "" {
|
||||
v.whiteList = strings.Split(volumeWhiteListOption, ",")
|
||||
}
|
||||
@@ -128,11 +131,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||
if *v.publicUrl == "" {
|
||||
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
|
||||
}
|
||||
isSeperatedPublicPort := *v.publicPort != *v.port
|
||||
|
||||
volumeMux := http.NewServeMux()
|
||||
publicVolumeMux := volumeMux
|
||||
if isSeperatedPublicPort {
|
||||
if v.isSeparatedPublicPort() {
|
||||
publicVolumeMux = http.NewServeMux()
|
||||
}
|
||||
|
||||
@@ -158,51 +160,131 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||
*v.compactionMBPerSecond,
|
||||
)
|
||||
|
||||
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
|
||||
glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
|
||||
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
|
||||
if e != nil {
|
||||
glog.Fatalf("Volume server listener error:%v", e)
|
||||
}
|
||||
if isSeperatedPublicPort {
|
||||
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
|
||||
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
|
||||
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
|
||||
if e != nil {
|
||||
glog.Fatalf("Volume server listener error:%v", e)
|
||||
// starting grpc server
|
||||
grpcS := v.startGrpcService(volumeServer)
|
||||
|
||||
// starting public http server
|
||||
var publicHttpDown httpdown.Server
|
||||
if v.isSeparatedPublicPort() {
|
||||
publicHttpDown = v.startPublicHttpService(publicVolumeMux)
|
||||
if nil == publicHttpDown {
|
||||
glog.Fatalf("start public http service failed")
|
||||
}
|
||||
go func() {
|
||||
if e := http.Serve(publicListener, publicVolumeMux); e != nil {
|
||||
glog.Fatalf("Volume server fail to serve public: %v", e)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// starting the cluster http server
|
||||
clusterHttpServer := v.startClusterHttpService(volumeMux)
|
||||
|
||||
stopChain := make(chan struct{})
|
||||
util.OnInterrupt(func() {
|
||||
fmt.Println("volume server has be killed")
|
||||
var startTime time.Time
|
||||
|
||||
// firstly, stop the public http service to prevent from receiving new user request
|
||||
if nil != publicHttpDown {
|
||||
startTime = time.Now()
|
||||
if err := publicHttpDown.Stop(); err != nil {
|
||||
glog.Warningf("stop the public http server failed, %v", err)
|
||||
}
|
||||
delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
|
||||
glog.V(0).Infof("stop public http server, elapsed %dms", delta)
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
if err := clusterHttpServer.Stop(); err != nil {
|
||||
glog.Warningf("stop the cluster http server failed, %v", err)
|
||||
}
|
||||
delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
|
||||
glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
|
||||
|
||||
startTime = time.Now()
|
||||
grpcS.GracefulStop()
|
||||
delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
|
||||
glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
|
||||
|
||||
startTime = time.Now()
|
||||
volumeServer.Shutdown()
|
||||
delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
|
||||
glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
|
||||
|
||||
pprof.StopCPUProfile()
|
||||
|
||||
close(stopChain) // notify exit
|
||||
})
|
||||
|
||||
// starting grpc server
|
||||
select {
|
||||
case <-stopChain:
|
||||
}
|
||||
glog.Warningf("the volume server exit.")
|
||||
}
|
||||
|
||||
// check whether configure the public port
|
||||
func (v VolumeServerOptions) isSeparatedPublicPort() bool {
|
||||
return *v.publicPort != *v.port
|
||||
}
|
||||
|
||||
func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
|
||||
grpcPort := *v.port + 10000
|
||||
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
|
||||
}
|
||||
grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
|
||||
volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
|
||||
volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
|
||||
reflection.Register(grpcS)
|
||||
go grpcS.Serve(grpcL)
|
||||
go func() {
|
||||
if err := grpcS.Serve(grpcL); err != nil {
|
||||
glog.Fatalf("start gRPC service failed, %s", err)
|
||||
}
|
||||
}()
|
||||
return grpcS
|
||||
}
|
||||
|
||||
if viper.GetString("https.volume.key") != "" {
|
||||
if e := http.ServeTLS(listener, volumeMux,
|
||||
viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil {
|
||||
glog.Fatalf("Volume server fail to serve: %v", e)
|
||||
}
|
||||
} else {
|
||||
if e := http.Serve(listener, volumeMux); e != nil {
|
||||
glog.Fatalf("Volume server fail to serve: %v", e)
|
||||
}
|
||||
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
|
||||
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
|
||||
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
|
||||
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
|
||||
if e != nil {
|
||||
glog.Fatalf("Volume server listener error:%v", e)
|
||||
}
|
||||
|
||||
pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
|
||||
publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
|
||||
go func() {
|
||||
if err := publicHttpDown.Wait(); err != nil {
|
||||
glog.Errorf("public http down wait failed, %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return publicHttpDown
|
||||
}
|
||||
|
||||
func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
|
||||
var (
|
||||
certFile, keyFile string
|
||||
)
|
||||
if viper.GetString("https.volume.key") != "" {
|
||||
certFile = viper.GetString("https.volume.cert")
|
||||
keyFile = viper.GetString("https.volume.key")
|
||||
}
|
||||
|
||||
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
|
||||
glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
|
||||
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
|
||||
if e != nil {
|
||||
glog.Fatalf("Volume server listener error:%v", e)
|
||||
}
|
||||
|
||||
httpDown := httpdown.HTTP{
|
||||
KillTimeout: 5 * time.Minute,
|
||||
StopTimeout: 5 * time.Minute,
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile}
|
||||
clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
|
||||
go func() {
|
||||
if e := clusterHttpServer.Wait(); e != nil {
|
||||
glog.Fatalf("Volume server fail to serve: %v", e)
|
||||
}
|
||||
}()
|
||||
return clusterHttpServer
|
||||
}
|
||||
|
Reference in New Issue
Block a user