weed mount can request to prioritize to write to a data center

This commit is contained in:
Chris Lu
2018-07-14 13:36:28 -07:00
parent a51aa4c586
commit 842dab07b4
10 changed files with 102 additions and 78 deletions

View File

@@ -175,14 +175,19 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
var altRequest *operation.VolumeAssignRequest
dataCenter := req.DataCenter
if dataCenter!=""{
dataCenter = fs.option.DataCenter
}
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
Replication: req.Replication,
Collection: req.Collection,
Ttl: ttlStr,
DataCenter: fs.option.DataCenter,
DataCenter: dataCenter,
}
if fs.option.DataCenter != "" {
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
Replication: req.Replication,

View File

@@ -41,16 +41,16 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
return
}
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: r.URL.Query().Get("ttl"),
DataCenter: fs.option.DataCenter,
DataCenter: dataCenter,
}
var altRequest *operation.VolumeAssignRequest
if fs.option.DataCenter != "" {
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
@@ -82,8 +82,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if collection == "" {
collection = fs.option.Collection
}
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked {
if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked {
return
}
@@ -91,12 +95,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
var err error
if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection)
fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
if err != nil {
return
}
} else {
fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection)
fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
if err != nil {
return
}

View File

@@ -16,7 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool {
func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
@@ -52,7 +52,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return false
}
reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection)
reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -61,7 +61,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return true
}
func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) {
func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
@@ -104,7 +104,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
writtenChunks = writtenChunks + 1
fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection)
fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
if assignErr != nil {
return nil, assignErr
}

View File

@@ -73,7 +73,7 @@ func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
return
}
func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
/*
Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
@@ -104,7 +104,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
path := r.URL.Path
if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
}
return
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
//Default handle way for http multipart
if r.Method == "PUT" {
buf, _ := ioutil.ReadAll(r.Body)
@@ -33,7 +33,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re
}
fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
} else {
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
}
return
}