mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-15 00:46:51 +08:00
1. refactoring, merge "replication" logic into "topology" package
2. when growing volumes, additional preferred "rack" and "dataNode" paraemters are also provided. Previously only "dataCenter" paraemter is provided.
This commit is contained in:
@@ -2,10 +2,10 @@ package weed_server
|
||||
|
||||
import (
|
||||
"code.google.com/p/weed-fs/go/glog"
|
||||
"code.google.com/p/weed-fs/go/replication"
|
||||
"code.google.com/p/weed-fs/go/sequence"
|
||||
"code.google.com/p/weed-fs/go/topology"
|
||||
"code.google.com/p/weed-fs/go/util"
|
||||
"errors"
|
||||
"github.com/goraft/raft"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
@@ -25,7 +25,7 @@ type MasterServer struct {
|
||||
whiteList []string
|
||||
|
||||
Topo *topology.Topology
|
||||
vg *replication.VolumeGrowth
|
||||
vg *topology.VolumeGrowth
|
||||
vgLock sync.Mutex
|
||||
|
||||
bounedLeaderChan chan int
|
||||
@@ -53,7 +53,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
||||
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
|
||||
glog.Fatalf("cannot create topology:%s", e)
|
||||
}
|
||||
ms.vg = replication.NewDefaultVolumeGrowth()
|
||||
ms.vg = topology.NewDefaultVolumeGrowth()
|
||||
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
|
||||
|
||||
r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler)))
|
||||
@@ -94,11 +94,11 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if ms.Topo.IsLeader() {
|
||||
f(w, r)
|
||||
} else {
|
||||
} else if ms.Topo.RaftServer.Leader() != "" {
|
||||
ms.bounedLeaderChan <- 1
|
||||
defer func() { <-ms.bounedLeaderChan }()
|
||||
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
|
||||
if err != nil || ms.Topo.RaftServer.Leader() == "" {
|
||||
if err != nil {
|
||||
writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL http://" + ms.Topo.RaftServer.Leader() + " Parse Error " + err.Error()})
|
||||
return
|
||||
}
|
||||
@@ -106,6 +106,9 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
|
||||
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
|
||||
proxy.Transport = util.Transport
|
||||
proxy.ServeHTTP(w, r)
|
||||
} else {
|
||||
//drop it to the floor
|
||||
writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package weed_server
|
||||
import (
|
||||
"code.google.com/p/weed-fs/go/stats"
|
||||
"code.google.com/p/weed-fs/go/storage"
|
||||
"code.google.com/p/weed-fs/go/topology"
|
||||
"code.google.com/p/weed-fs/go/util"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -39,24 +40,19 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
||||
stats.AssignRequest()
|
||||
c, e := strconv.Atoi(r.FormValue("count"))
|
||||
requestedCount, e := strconv.Atoi(r.FormValue("count"))
|
||||
if e != nil {
|
||||
c = 1
|
||||
requestedCount = 1
|
||||
}
|
||||
replication := r.FormValue("replication")
|
||||
if replication == "" {
|
||||
replication = ms.defaultReplicaPlacement
|
||||
}
|
||||
collection := r.FormValue("collection")
|
||||
dataCenter := r.FormValue("dataCenter")
|
||||
replicaPlacement, err := storage.NewReplicaPlacementFromString(replication)
|
||||
|
||||
option, err := ms.getVolumeGrowOption(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusNotAcceptable)
|
||||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
|
||||
if !ms.Topo.HasWriableVolume(option) {
|
||||
if ms.Topo.FreeSpace() <= 0 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
|
||||
@@ -64,15 +60,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
|
||||
} else {
|
||||
ms.vgLock.Lock()
|
||||
defer ms.vgLock.Unlock()
|
||||
if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
|
||||
if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.Topo); err != nil {
|
||||
if !ms.Topo.HasWriableVolume(option) {
|
||||
if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
|
||||
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fid, count, dn, err := ms.Topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
|
||||
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
|
||||
if err == nil {
|
||||
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
|
||||
} else {
|
||||
@@ -138,13 +134,18 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
|
||||
count := 0
|
||||
replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
|
||||
option, err := ms.getVolumeGrowOption(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusNotAcceptable)
|
||||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
|
||||
if ms.Topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
|
||||
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
|
||||
if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
|
||||
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
|
||||
} else {
|
||||
count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.Topo)
|
||||
count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
|
||||
}
|
||||
} else {
|
||||
err = errors.New("parameter count is not found")
|
||||
@@ -189,3 +190,27 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
|
||||
submitForClientHandler(w, r, ms.Topo.RaftServer.Leader())
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
|
||||
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
|
||||
return vl.GetActiveVolumeCount(option) > 0
|
||||
}
|
||||
|
||||
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
|
||||
replicationString := r.FormValue("replication")
|
||||
if replicationString == "" {
|
||||
replicationString = ms.defaultReplicaPlacement
|
||||
}
|
||||
replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volumeGrowOption := &topology.VolumeGrowOption{
|
||||
Collection: r.FormValue("collection"),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
DataCenter: r.FormValue("dataCenter"),
|
||||
Rack: r.FormValue("rack"),
|
||||
DataNode: r.FormValue("dataNode"),
|
||||
}
|
||||
return volumeGrowOption, nil
|
||||
}
|
||||
|
@@ -3,9 +3,9 @@ package weed_server
|
||||
import (
|
||||
"code.google.com/p/weed-fs/go/glog"
|
||||
"code.google.com/p/weed-fs/go/operation"
|
||||
"code.google.com/p/weed-fs/go/replication"
|
||||
"code.google.com/p/weed-fs/go/stats"
|
||||
"code.google.com/p/weed-fs/go/storage"
|
||||
"code.google.com/p/weed-fs/go/topology"
|
||||
"code.google.com/p/weed-fs/go/util"
|
||||
"mime"
|
||||
"net/http"
|
||||
@@ -214,7 +214,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||
writeJsonError(w, r, ne)
|
||||
return
|
||||
}
|
||||
ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
|
||||
ret, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
|
||||
if errorStatus == "" {
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
} else {
|
||||
@@ -251,7 +251,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
n.Size = 0
|
||||
ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
|
||||
ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
|
||||
|
||||
if ret != 0 {
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
|
Reference in New Issue
Block a user