Read write directory object (#7003)

* read directory object

* address comments

* address comments

* name should not have "/" prefix

* fix compilation

* refactor
This commit is contained in:
Chris Lu 2025-07-20 13:28:17 -07:00 committed by GitHub
parent 41b5bac063
commit 85036936d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 45 additions and 46 deletions

View File

@ -153,6 +153,28 @@ func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Req
}
}
// handleDirectoryObjectRequest is a helper function that handles directory object requests
// for both GET and HEAD operations, eliminating code duplication
func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) bool {
// Check if this is a directory object and handle it directly
if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
glog.Errorf("%s: error checking directory object %s/%s: %v", handlerName, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return true // Request was handled (with error)
} else if dirEntry != nil {
glog.V(2).Infof("%s: directory object %s/%s found, serving content", handlerName, bucket, object)
s3a.serveDirectoryContent(w, r, dirEntry)
return true // Request was handled successfully
} else if isDirectoryObject {
// Directory object but doesn't exist
glog.V(2).Infof("%s: directory object %s/%s not found", handlerName, bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return true // Request was handled (with not found)
}
return false // Not a directory object, continue with normal processing
}
func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool) (listEntry ListEntry) {
storageClass := "STANDARD"
if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
@ -196,20 +218,9 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
// Check if this is a directory object and handle it directly
if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
glog.Errorf("GetObjectHandler: error checking directory object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} else if dirEntry != nil {
glog.V(2).Infof("GetObjectHandler: directory object %s/%s found, serving content", bucket, object)
s3a.serveDirectoryContent(w, r, dirEntry)
return
} else if isDirectoryObject {
// Directory object but doesn't exist
glog.V(2).Infof("GetObjectHandler: directory object %s/%s not found", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
// Handle directory objects with shared logic
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
return // Directory object request was handled
}
// Check for specific version ID in query parameters
@ -304,20 +315,9 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
// Check if this is a directory object and handle it directly
if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
glog.Errorf("HeadObjectHandler: error checking directory object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} else if dirEntry != nil {
glog.V(2).Infof("HeadObjectHandler: directory object %s/%s found, serving content", bucket, object)
s3a.serveDirectoryContent(w, r, dirEntry)
return
} else if isDirectoryObject {
// Directory object but doesn't exist
glog.V(2).Infof("HeadObjectHandler: directory object %s/%s not found", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
// Handle directory objects with shared logic
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") {
return // Directory object request was handled
}
// Check for specific version ID in query parameters

View File

@ -353,17 +353,16 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// when a new "null" version becomes the latest during suspended versioning
func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimPrefix(object, "/")
versionsObjectPath := cleanObject + ".versions"
versionsObjectPath := object + ".versions"
versionsDir := bucketDir + "/" + versionsObjectPath
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s/%s", bucket, cleanObject)
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object)
// Check if .versions directory exists
_, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
// No .versions directory exists, nothing to update
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s/%s", bucket, cleanObject)
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s%s", bucket, object)
return nil
}
@ -413,7 +412,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return fmt.Errorf("failed to update .versions directory metadata: %v", err)
}
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s/%s", bucket, cleanObject)
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s%s", bucket, object)
}
return nil

View File

@ -696,8 +696,7 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimPrefix(object, "/")
versionsObjectPath := cleanObject + ".versions"
versionsObjectPath := object + ".versions"
// Get the .versions directory entry to read latest version metadata
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
@ -705,14 +704,14 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
// .versions directory doesn't exist - this can happen for objects that existed
// before versioning was enabled on the bucket. Fall back to checking for a
// regular (non-versioned) object file.
glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s%s, checking for pre-versioning object", bucket, object)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
regularEntry, regularErr := s3a.getEntry(bucketDir, object)
if regularErr != nil {
return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, cleanObject, err)
return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, object, err)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, cleanObject)
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, object)
return regularEntry, nil
}
@ -720,14 +719,14 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
if versionsEntry.Extended == nil {
// No metadata means all versioned objects have been deleted.
// Fall back to checking for a pre-versioning object.
glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, cleanObject)
glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s, checking for pre-versioning object", bucket, object)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
regularEntry, regularErr := s3a.getEntry(bucketDir, object)
if regularErr != nil {
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, cleanObject)
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object)
return regularEntry, nil
}
@ -739,12 +738,12 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
// Fall back to checking for a pre-versioning object.
glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
regularEntry, regularErr := s3a.getEntry(bucketDir, object)
if regularErr != nil {
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, cleanObject)
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s after version deletion", bucket, object)
return regularEntry, nil
}

View File

@ -8,6 +8,7 @@ import (
type FullPath string
func NewFullPath(dir, name string) FullPath {
name = strings.TrimSuffix(name, "/")
return FullPath(dir).Child(name)
}