refactoring

This commit is contained in:
Chris Lu
2020-11-15 16:58:48 -08:00
parent 500bcab953
commit 95c0de285d
12 changed files with 242 additions and 256 deletions

View File

@@ -7,7 +7,6 @@ import (
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -30,31 +29,13 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
func (fs *FilerServer) assignNewFileInfo(so *filer.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DataCenter: so.DataCenter,
Rack: so.Rack,
}
var altRequest *operation.VolumeAssignRequest
if so.DataCenter != "" || so.Rack != "" {
altRequest = &operation.VolumeAssignRequest{
Count: 1,
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DataCenter: "",
Rack: "",
}
}
ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
@@ -76,32 +57,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query()
collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
rack := query.Get("rack")
if dataCenter == "" {
rack = fs.option.Rack
}
ttlString := r.URL.Query().Get("ttl")
// read ttl in seconds
ttl, err := needle.ReadTTL(ttlString)
ttlSeconds := int32(0)
if err == nil {
ttlSeconds = int32(ttl.Minutes()) * 60
}
so := &filer.StorageOption{
Replication: replication,
Collection: collection,
DataCenter: dataCenter,
Rack: rack,
TtlSeconds: ttlSeconds,
Fsync: fsync,
}
so := fs.detectStorageOption0(r.RequestURI,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
query.Get("dataCenter"),
query.Get("rack"),
)
fs.autoChunk(ctx, w, r, so)
@@ -141,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
// default
collection = fs.option.Collection
replication = fs.option.DefaultReplication
// get default collection settings
if qCollection != "" {
collection = qCollection
}
if qReplication != "" {
replication = qReplication
}
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) (*operation.StorageOption) {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
// required by buckets folder
bucketDefaultReplication := ""
bucketDefaultReplication, fsync := "", false
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
@@ -171,5 +124,32 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
replication = bucketDefaultReplication
}
return
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
if ttlSeconds == 0 {
ttl, err := needle.ReadTTL(rule.GetTtl())
if err != nil {
glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
}
ttlSeconds = int32(ttl.Minutes())*60
}
return &operation.StorageOption{
Replication: util.Nvl(replication, rule.Replication),
Collection: util.Nvl(collection, rule.Collection),
DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, fs.option.Rack),
TtlSeconds: ttlSeconds,
Fsync: fsync || rule.Fsync,
}
}
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) (*operation.StorageOption) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
}