S3 API: fix DeleteMultipleObjectsHandler

fix https://github.com/chrislusf/seaweedfs/issues/1241
This commit is contained in:
Chris Lu
2020-03-20 14:17:31 -07:00
parent 165b0d22a4
commit c4bea45099
10 changed files with 173 additions and 320 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -165,38 +166,32 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
return
}
var index int
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
s3a.streamRemove(deleteObjects.Quiet, func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool) {
if index >= len(deleteObjects.Objects) {
finished = true
return
}
object := deleteObjects.Objects[index]
s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
lastSeparator := strings.LastIndex(object.ObjectName, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive = "/", object.ObjectName, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
entryName = object.ObjectName[lastSeparator+1:]
parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
for _, object := range deleteObjects.Objects {
lastSeparator := strings.LastIndex(object.ObjectName, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "/", object.ObjectName, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
entryName = object.ObjectName[lastSeparator+1:]
parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
deletedObjects = append(deletedObjects, object)
} else {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.ObjectName,
})
}
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
return
}, func(err string) {
object := deleteObjects.Objects[index]
if err == "" {
deletedObjects = append(deletedObjects, object)
} else {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err,
Key: object.ObjectName,
})
}
index++
return nil
})
deleteResp := DeleteObjectsResponse{}