filer: add path-specific option to enforce readonly

This commit is contained in:
Chris Lu
2021-06-04 01:03:41 -07:00
parent f15d7a57f5
commit ab606dec2a
8 changed files with 171 additions and 115 deletions

View File

@@ -259,14 +259,14 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
so := fs.detectStorageOption(fullpath,
so, _ := fs.detectStorageOption(fullpath,
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
newEntry.Attributes.DiskType,
"",
"",
)
) // ignore readonly error for capacity needed to manifestize
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
if err != nil {
// not good, but should be ok
@@ -307,7 +307,11 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
if err != nil {
glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
}
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@@ -333,7 +337,11 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))

View File

@@ -2,6 +2,7 @@ package weed_server
import (
"context"
"errors"
"net/http"
"os"
"strings"
@@ -19,6 +20,8 @@ import (
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
ErrReadOnly = errors.New("read only")
)
type FilerPostResult struct {
@@ -57,7 +60,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
ctx := context.Background()
query := r.URL.Query()
so := fs.detectStorageOption0(r.RequestURI,
so, err := fs.detectStorageOption0(r.RequestURI,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
@@ -65,6 +68,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
query.Get("dataCenter"),
query.Get("rack"),
)
if err != nil {
if err == ErrReadOnly {
w.WriteHeader(http.StatusInsufficientStorage)
} else {
glog.V(1).Infoln("post", r.RequestURI, ":", err.Error())
w.WriteHeader(http.StatusInternalServerError)
}
return
}
fs.autoChunk(ctx, w, r, contentLength, so)
util.CloseRequest(r)
@@ -105,7 +117,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@@ -121,6 +133,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
if rule.ReadOnly {
return nil, ErrReadOnly
}
if ttlSeconds == 0 {
ttl, err := needle.ReadTTL(rule.GetTtl())
if err != nil {
@@ -138,10 +154,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}
}, nil
}
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {