From c196d03951a75d3b8976f556cb0400e5b522edeb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 21 Jul 2025 00:23:22 -0700 Subject: [PATCH] fix listing object versions (#7006) * fix listing object versions * Update s3api_object_versioning.go * Update s3_directory_versioning_test.go * check previous skipped tests * fix test_versioning_stack_delete_merkers * address test_bucket_list_return_data_versioning * Update s3_directory_versioning_test.go * fix test_versioning_concurrent_multi_object_delete * fix test_versioning_obj_suspend_versions test * fix empty owner * fix listing versioned objects * default owner * fix path --- .github/workflows/s3tests.yml | 3 +- .../s3_directory_versioning_test.go | 776 ++++++++++++++++++ weed/s3api/s3api_object_handlers.go | 28 +- weed/s3api/s3api_object_handlers_acl.go | 148 +++- weed/s3api/s3api_object_handlers_delete.go | 116 ++- weed/s3api/s3api_object_handlers_list.go | 93 ++- weed/s3api/s3api_object_handlers_put.go | 3 + weed/s3api/s3api_object_versioning.go | 102 ++- 8 files changed, 1190 insertions(+), 79 deletions(-) create mode 100644 test/s3/versioning/s3_directory_versioning_test.go diff --git a/.github/workflows/s3tests.yml b/.github/workflows/s3tests.yml index ea8c72794..b47e9d21a 100644 --- a/.github/workflows/s3tests.yml +++ b/.github/workflows/s3tests.yml @@ -399,7 +399,8 @@ jobs: echo "S3 connection test failed, retrying... ($i/10)" sleep 2 done - tox -- s3tests_boto3/functional/test_s3.py -k "object_lock or (versioning and not test_versioning_obj_suspend_versions and not test_bucket_list_return_data_versioning and not test_versioning_concurrent_multi_object_delete)" --tb=short + # tox -- s3tests_boto3/functional/test_s3.py -k "object_lock or (versioning and not test_versioning_obj_suspend_versions and not test_bucket_list_return_data_versioning and not test_versioning_concurrent_multi_object_delete)" --tb=short + tox -- s3tests_boto3/functional/test_s3.py -k "object_lock or versioning" --tb=short kill -9 $pid || true # Clean up data directory rm -rf "$WEED_DATA_DIR" || true diff --git a/test/s3/versioning/s3_directory_versioning_test.go b/test/s3/versioning/s3_directory_versioning_test.go new file mode 100644 index 000000000..7874bc055 --- /dev/null +++ b/test/s3/versioning/s3_directory_versioning_test.go @@ -0,0 +1,776 @@ +package s3api + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestListObjectVersionsIncludesDirectories tests that directories are included in list-object-versions response +// This ensures compatibility with Minio and AWS S3 behavior +func TestListObjectVersionsIncludesDirectories(t *testing.T) { + bucketName := "test-versioning-directories" + + client := setupS3Client(t) + + // Create bucket + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Enable versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + // First create explicit directory objects (keys ending with "/") + // These are the directories that should appear in list-object-versions + explicitDirectories := []string{ + "Veeam/", + "Veeam/Archive/", + "Veeam/Archive/vbr/", + "Veeam/Backup/", + "Veeam/Backup/vbr/", + "Veeam/Backup/vbr/Clients/", + } + + // Create explicit directory objects + for _, dirKey := range explicitDirectories { + _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dirKey), + Body: strings.NewReader(""), // Empty content for directories + }) + require.NoError(t, err, "Failed to create directory object %s", dirKey) + } + + // Now create some test files + testFiles := []string{ + "Veeam/test-file.txt", + "Veeam/Archive/test-file2.txt", + "Veeam/Archive/vbr/test-file3.txt", + "Veeam/Backup/test-file4.txt", + "Veeam/Backup/vbr/test-file5.txt", + "Veeam/Backup/vbr/Clients/test-file6.txt", + } + + // Upload test files + for _, objectKey := range testFiles { + _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("test content"), + }) + require.NoError(t, err, "Failed to create file %s", objectKey) + } + + // List object versions + listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Extract all keys from versions + var allKeys []string + for _, version := range listResp.Versions { + allKeys = append(allKeys, *version.Key) + } + + // Expected directories that should be included (with trailing slash) + expectedDirectories := []string{ + "Veeam/", + "Veeam/Archive/", + "Veeam/Archive/vbr/", + "Veeam/Backup/", + "Veeam/Backup/vbr/", + "Veeam/Backup/vbr/Clients/", + } + + // Verify that directories are included in the response + t.Logf("Found %d total versions", len(listResp.Versions)) + t.Logf("All keys: %v", allKeys) + + for _, expectedDir := range expectedDirectories { + found := false + for _, version := range listResp.Versions { + if *version.Key == expectedDir { + found = true + // Verify directory properties + assert.Equal(t, "null", *version.VersionId, "Directory %s should have VersionId 'null'", expectedDir) + assert.Equal(t, int64(0), *version.Size, "Directory %s should have size 0", expectedDir) + assert.True(t, *version.IsLatest, "Directory %s should be marked as latest", expectedDir) + assert.Equal(t, "\"d41d8cd98f00b204e9800998ecf8427e\"", *version.ETag, "Directory %s should have MD5 of empty string as ETag", expectedDir) + assert.Equal(t, types.ObjectStorageClassStandard, version.StorageClass, "Directory %s should have STANDARD storage class", expectedDir) + break + } + } + assert.True(t, found, "Directory %s should be included in list-object-versions response", expectedDir) + } + + // Also verify that actual files are included + for _, objectKey := range testFiles { + found := false + for _, version := range listResp.Versions { + if *version.Key == objectKey { + found = true + assert.NotEqual(t, "null", *version.VersionId, "File %s should have a real version ID", objectKey) + assert.Greater(t, *version.Size, int64(0), "File %s should have size > 0", objectKey) + break + } + } + assert.True(t, found, "File %s should be included in list-object-versions response", objectKey) + } + + // Count directories vs files + directoryCount := 0 + fileCount := 0 + for _, version := range listResp.Versions { + if strings.HasSuffix(*version.Key, "/") && *version.Size == 0 && *version.VersionId == "null" { + directoryCount++ + } else { + fileCount++ + } + } + + t.Logf("Found %d directories and %d files", directoryCount, fileCount) + assert.Equal(t, len(expectedDirectories), directoryCount, "Should find exactly %d directories", len(expectedDirectories)) + assert.Equal(t, len(testFiles), fileCount, "Should find exactly %d files", len(testFiles)) +} + +// TestListObjectVersionsDeleteMarkers tests that delete markers are properly separated from versions +// This test verifies the fix for the issue where delete markers were incorrectly categorized as versions +func TestListObjectVersionsDeleteMarkers(t *testing.T) { + bucketName := "test-delete-markers" + + client := setupS3Client(t) + + // Create bucket + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Enable versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + objectKey := "test1/a" + + // 1. Create one version of the file + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("test content"), + }) + require.NoError(t, err) + + // 2. Delete the object 3 times to create 3 delete markers + for i := 0; i < 3; i++ { + _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + } + + // 3. List object versions and verify the response structure + listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // 4. Verify that we have exactly 1 version and 3 delete markers + assert.Len(t, listResp.Versions, 1, "Should have exactly 1 file version") + assert.Len(t, listResp.DeleteMarkers, 3, "Should have exactly 3 delete markers") + + // 5. Verify the version is for our test file + version := listResp.Versions[0] + assert.Equal(t, objectKey, *version.Key, "Version should be for our test file") + assert.NotEqual(t, "null", *version.VersionId, "File version should have a real version ID") + assert.Greater(t, *version.Size, int64(0), "File version should have size > 0") + + // 6. Verify all delete markers are for our test file + for i, deleteMarker := range listResp.DeleteMarkers { + assert.Equal(t, objectKey, *deleteMarker.Key, "Delete marker %d should be for our test file", i) + assert.NotEqual(t, "null", *deleteMarker.VersionId, "Delete marker %d should have a real version ID", i) + } + + t.Logf("Successfully verified: 1 version + 3 delete markers for object %s", objectKey) +} + +// TestVersionedObjectAcl tests that ACL operations work correctly on objects in versioned buckets +// This test verifies the fix for the NoSuchKey error when getting ACLs for objects in versioned buckets +func TestVersionedObjectAcl(t *testing.T) { + bucketName := "test-versioned-acl" + + client := setupS3Client(t) + + // Create bucket + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Enable versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + objectKey := "test-acl-object" + + // Create an object in the versioned bucket + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("test content for ACL"), + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId, "Object should have a version ID") + + // Test 1: Get ACL for the object (without specifying version ID - should get latest version) + getAclResp, err := client.GetObjectAcl(context.TODO(), &s3.GetObjectAclInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Should be able to get ACL for object in versioned bucket") + require.NotNil(t, getAclResp.Owner, "ACL response should have owner information") + + // Test 2: Get ACL for specific version ID + getAclVersionResp, err := client.GetObjectAcl(context.TODO(), &s3.GetObjectAclInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + VersionId: putResp.VersionId, + }) + require.NoError(t, err, "Should be able to get ACL for specific version") + require.NotNil(t, getAclVersionResp.Owner, "Versioned ACL response should have owner information") + + // Test 3: Verify both ACL responses are the same (same object, same version) + assert.Equal(t, getAclResp.Owner.ID, getAclVersionResp.Owner.ID, "Owner ID should match for latest and specific version") + + // Test 4: Create another version of the same object + putResp2, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("updated content for ACL"), + }) + require.NoError(t, err) + require.NotNil(t, putResp2.VersionId, "Second object version should have a version ID") + require.NotEqual(t, putResp.VersionId, putResp2.VersionId, "Version IDs should be different") + + // Test 5: Get ACL for latest version (should be the second version) + getAclLatestResp, err := client.GetObjectAcl(context.TODO(), &s3.GetObjectAclInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Should be able to get ACL for latest version after update") + require.NotNil(t, getAclLatestResp.Owner, "Latest ACL response should have owner information") + + // Test 6: Get ACL for the first version specifically + getAclFirstResp, err := client.GetObjectAcl(context.TODO(), &s3.GetObjectAclInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + VersionId: putResp.VersionId, + }) + require.NoError(t, err, "Should be able to get ACL for first version specifically") + require.NotNil(t, getAclFirstResp.Owner, "First version ACL response should have owner information") + + // Test 7: Verify we can put ACL on versioned objects + _, err = client.PutObjectAcl(context.TODO(), &s3.PutObjectAclInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + ACL: types.ObjectCannedACLPrivate, + }) + require.NoError(t, err, "Should be able to put ACL on versioned object") + + t.Logf("Successfully verified ACL operations on versioned object %s with versions %s and %s", + objectKey, *putResp.VersionId, *putResp2.VersionId) +} + +// TestConcurrentMultiObjectDelete tests that concurrent delete operations work correctly without race conditions +// This test verifies the fix for the race condition in deleteSpecificObjectVersion +func TestConcurrentMultiObjectDelete(t *testing.T) { + bucketName := "test-concurrent-delete" + numObjects := 5 + numThreads := 5 + + client := setupS3Client(t) + + // Create bucket + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Enable versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + // Create objects + var objectKeys []string + var versionIds []string + + for i := 0; i < numObjects; i++ { + objectKey := fmt.Sprintf("key_%d", i) + objectKeys = append(objectKeys, objectKey) + + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader(fmt.Sprintf("content for key_%d", i)), + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId) + versionIds = append(versionIds, *putResp.VersionId) + } + + // Verify objects were created + listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listResp.Versions, numObjects, "Should have created %d objects", numObjects) + + // Create delete objects request + var objectsToDelete []types.ObjectIdentifier + for i, objectKey := range objectKeys { + objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{ + Key: aws.String(objectKey), + VersionId: aws.String(versionIds[i]), + }) + } + + // Run concurrent delete operations + results := make([]*s3.DeleteObjectsOutput, numThreads) + var wg sync.WaitGroup + + for i := 0; i < numThreads; i++ { + wg.Add(1) + go func(threadIdx int) { + defer wg.Done() + deleteResp, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{ + Bucket: aws.String(bucketName), + Delete: &types.Delete{ + Objects: objectsToDelete, + Quiet: aws.Bool(false), + }, + }) + if err != nil { + t.Errorf("Thread %d: delete objects failed: %v", threadIdx, err) + return + } + results[threadIdx] = deleteResp + }(i) + } + + wg.Wait() + + // Verify results + for i, result := range results { + require.NotNil(t, result, "Thread %d should have a result", i) + assert.Len(t, result.Deleted, numObjects, "Thread %d should have deleted all %d objects", i, numObjects) + + if len(result.Errors) > 0 { + for _, deleteError := range result.Errors { + t.Errorf("Thread %d delete error: %s - %s (Key: %s, VersionId: %s)", + i, *deleteError.Code, *deleteError.Message, *deleteError.Key, + func() string { + if deleteError.VersionId != nil { + return *deleteError.VersionId + } else { + return "nil" + } + }()) + } + } + assert.Empty(t, result.Errors, "Thread %d should have no delete errors", i) + } + + // Verify objects are deleted (bucket should be empty) + finalListResp, err := client.ListObjects(context.TODO(), &s3.ListObjectsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Nil(t, finalListResp.Contents, "Bucket should be empty after all deletions") + + t.Logf("Successfully verified concurrent deletion of %d objects from %d threads", numObjects, numThreads) +} + +// TestSuspendedVersioningDeleteBehavior tests that delete operations during suspended versioning +// actually delete the "null" version object rather than creating delete markers +func TestSuspendedVersioningDeleteBehavior(t *testing.T) { + bucketName := "test-suspended-versioning-delete" + objectKey := "testobj" + + client := setupS3Client(t) + + // Create bucket + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Enable versioning and create some versions + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + // Create 3 versions + var versionIds []string + for i := 0; i < 3; i++ { + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader(fmt.Sprintf("content version %d", i+1)), + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId) + versionIds = append(versionIds, *putResp.VersionId) + } + + // Verify 3 versions exist + listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listResp.Versions, 3, "Should have 3 versions initially") + + // Suspend versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusSuspended, + }, + }) + require.NoError(t, err) + + // Create a new object during suspended versioning (this should be a "null" version) + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("null version content"), + }) + require.NoError(t, err) + + // Verify we still have 3 versions + 1 null version = 4 total + listResp, err = client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listResp.Versions, 4, "Should have 3 versions + 1 null version") + + // Find the null version + var nullVersionFound bool + for _, version := range listResp.Versions { + if *version.VersionId == "null" { + nullVersionFound = true + assert.True(t, *version.IsLatest, "Null version should be marked as latest during suspended versioning") + break + } + } + assert.True(t, nullVersionFound, "Should have found a null version") + + // Delete the object during suspended versioning (should actually delete the null version) + _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + // No VersionId specified - should delete the "null" version during suspended versioning + }) + require.NoError(t, err) + + // Verify the null version was actually deleted (not a delete marker created) + listResp, err = client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listResp.Versions, 3, "Should be back to 3 versions after deleting null version") + assert.Empty(t, listResp.DeleteMarkers, "Should have no delete markers during suspended versioning delete") + + // Verify null version is gone + nullVersionFound = false + for _, version := range listResp.Versions { + if *version.VersionId == "null" { + nullVersionFound = true + break + } + } + assert.False(t, nullVersionFound, "Null version should be deleted, not present") + + // Create another null version and delete it multiple times to test idempotency + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("another null version"), + }) + require.NoError(t, err) + + // Delete it twice to test idempotency + for i := 0; i < 2; i++ { + _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Delete should be idempotent - iteration %d", i+1) + } + + // Re-enable versioning + _, err = client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusEnabled, + }, + }) + require.NoError(t, err) + + // Create a new version with versioning enabled + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("new version after re-enabling"), + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId) + + // Now delete without version ID (should create delete marker) + deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, "true", deleteResp.DeleteMarker, "Should create delete marker when versioning is enabled") + + // Verify final state + listResp, err = client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listResp.Versions, 4, "Should have 3 original versions + 1 new version") + assert.Len(t, listResp.DeleteMarkers, 1, "Should have 1 delete marker") + + t.Logf("Successfully verified suspended versioning delete behavior") +} + +// TestVersionedObjectListBehavior tests that list operations show logical object names for versioned objects +// and that owner information is properly extracted from S3 metadata +func TestVersionedObjectListBehavior(t *testing.T) { + bucketName := "test-versioned-list" + objectKey := "testfile" + + client := setupS3Client(t) + + // Create bucket with object lock enabled (which enables versioning) + _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + ObjectLockEnabledForBucket: aws.Bool(true), + }) + require.NoError(t, err) + + // Clean up + defer func() { + cleanupBucket(t, client, bucketName) + }() + + // Verify versioning is enabled + versioningResp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Equal(t, types.BucketVersioningStatusEnabled, versioningResp.Status, "Bucket versioning should be enabled") + + // Create a versioned object + content := "test content for versioned object" + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader(content), + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId) + + versionId := *putResp.VersionId + t.Logf("Created versioned object with version ID: %s", versionId) + + // Test list-objects operation - should show logical object name, not internal versioned path + listResp, err := client.ListObjects(context.TODO(), &s3.ListObjectsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + require.Len(t, listResp.Contents, 1, "Should list exactly one object") + + listedObject := listResp.Contents[0] + + // Verify the object key is the logical name, not the internal versioned path + assert.Equal(t, objectKey, *listedObject.Key, "Should show logical object name, not internal versioned path") + assert.NotContains(t, *listedObject.Key, ".versions", "Object key should not contain .versions") + assert.NotContains(t, *listedObject.Key, versionId, "Object key should not contain version ID") + + // Verify object properties + assert.Equal(t, int64(len(content)), listedObject.Size, "Object size should match") + assert.NotNil(t, listedObject.ETag, "Object should have ETag") + assert.NotNil(t, listedObject.LastModified, "Object should have LastModified") + + // Verify owner information is present (even if anonymous) + require.NotNil(t, listedObject.Owner, "Object should have Owner information") + assert.NotEmpty(t, listedObject.Owner.ID, "Owner ID should not be empty") + assert.NotEmpty(t, listedObject.Owner.DisplayName, "Owner DisplayName should not be empty") + + t.Logf("Listed object: Key=%s, Size=%d, Owner.ID=%s, Owner.DisplayName=%s", + *listedObject.Key, listedObject.Size, *listedObject.Owner.ID, *listedObject.Owner.DisplayName) + + // Test list-objects-v2 operation as well + listV2Resp, err := client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + FetchOwner: aws.Bool(true), // Explicitly request owner information + }) + require.NoError(t, err) + require.Len(t, listV2Resp.Contents, 1, "ListObjectsV2 should also list exactly one object") + + listedObjectV2 := listV2Resp.Contents[0] + assert.Equal(t, objectKey, *listedObjectV2.Key, "ListObjectsV2 should also show logical object name") + assert.NotNil(t, listedObjectV2.Owner, "ListObjectsV2 should include owner when FetchOwner=true") + + // Create another version to ensure multiple versions don't appear in regular list + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader("updated content"), + }) + require.NoError(t, err) + + // List again - should still show only one logical object (the latest version) + listRespAfterUpdate, err := client.ListObjects(context.TODO(), &s3.ListObjectsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, listRespAfterUpdate.Contents, 1, "Should still list exactly one object after creating second version") + + // Compare with list-object-versions which should show both versions + versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + assert.Len(t, versionsResp.Versions, 2, "list-object-versions should show both versions") + + t.Logf("Successfully verified versioned object list behavior") +} + +// Helper function to setup S3 client +func setupS3Client(t *testing.T) *s3.Client { + // S3TestConfig holds configuration for S3 tests + type S3TestConfig struct { + Endpoint string + AccessKey string + SecretKey string + Region string + BucketPrefix string + UseSSL bool + SkipVerifySSL bool + } + + // Default test configuration - should match s3tests.conf + defaultConfig := &S3TestConfig{ + Endpoint: "http://localhost:8333", // Default SeaweedFS S3 port + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + Region: "us-east-1", + BucketPrefix: "test-versioning-", + UseSSL: false, + SkipVerifySSL: true, + } + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(defaultConfig.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + defaultConfig.AccessKey, + defaultConfig.SecretKey, + "", + )), + config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc( + func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: defaultConfig.Endpoint, + SigningRegion: defaultConfig.Region, + HostnameImmutable: true, + }, nil + })), + ) + require.NoError(t, err) + + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true // Important for SeaweedFS + }) +} + +// Helper function to clean up bucket +func cleanupBucket(t *testing.T, client *s3.Client, bucketName string) { + // First, delete all objects and versions + err := deleteAllObjectVersions(t, client, bucketName) + if err != nil { + t.Logf("Warning: failed to delete all object versions: %v", err) + } + + // Then delete the bucket + _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Logf("Warning: failed to delete bucket %s: %v", bucketName, err) + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 77863acfe..bfaeb568b 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -198,9 +198,33 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu StorageClass: StorageClass(storageClass), } if fetchOwner { + // Extract owner from S3 metadata (Extended attributes) instead of file system attributes + var ownerID, displayName string + if entry.Extended != nil { + if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { + ownerID = string(ownerBytes) + } + } + + // Fallback to anonymous if no S3 owner found + if ownerID == "" { + ownerID = s3_constants.AccountAnonymousId + displayName = "anonymous" + } else { + // Try to resolve display name from IAM system + displayName = "unknown" + // Note: IAM resolution would require access to the S3ApiServer instance + // For now, use a simple fallback or could be enhanced later + } + + // Additional fallback to file system username if available and no display name resolved + if displayName == "unknown" && entry.Attributes.UserName != "" { + displayName = entry.Attributes.UserName + } + listEntry.Owner = CanonicalUser{ - ID: fmt.Sprintf("%x", entry.Attributes.Uid), - DisplayName: entry.Attributes.UserName, + ID: ownerID, + DisplayName: displayName, } } return listEntry diff --git a/weed/s3api/s3api_object_handlers_acl.go b/weed/s3api/s3api_object_handlers_acl.go index 7185f9896..1386b6cba 100644 --- a/weed/s3api/s3api_object_handlers_acl.go +++ b/weed/s3api/s3api_object_handlers_acl.go @@ -24,18 +24,63 @@ func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Reque return } - // Check if object exists and get its metadata - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err := s3a.getEntry(bucketDir, object) + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") + + // Check if versioning is configured for the bucket (Enabled or Suspended) + versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { - if errors.Is(err, filer_pb.ErrNotFound) { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } - glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err) + glog.Errorf("GetObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } + + var entry *filer_pb.Entry + + if versioningConfigured { + // Handle versioned object ACL retrieval - use same logic as GetObjectHandler + if versionId != "" { + // Request for specific version + glog.V(2).Infof("GetObjectAclHandler: requesting ACL for specific version %s of %s%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + } else { + // Request for latest version + glog.V(2).Infof("GetObjectAclHandler: requesting ACL for latest version of %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + } + + if err != nil { + glog.Errorf("GetObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + + // Check if this is a delete marker + if entry.Extended != nil { + if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + } else { + // Handle regular (non-versioned) object ACL retrieval + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err = s3a.getEntry(bucketDir, object) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + } + if entry == nil { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return @@ -123,18 +168,63 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque return } - // Check if object exists and get its metadata - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err := s3a.getEntry(bucketDir, object) + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") + + // Check if versioning is configured for the bucket (Enabled or Suspended) + versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { - if errors.Is(err, filer_pb.ErrNotFound) { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } - glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err) + glog.Errorf("PutObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } + + var entry *filer_pb.Entry + + if versioningConfigured { + // Handle versioned object ACL modification - use same logic as GetObjectHandler + if versionId != "" { + // Request for specific version + glog.V(2).Infof("PutObjectAclHandler: modifying ACL for specific version %s of %s%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + } else { + // Request for latest version + glog.V(2).Infof("PutObjectAclHandler: modifying ACL for latest version of %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + } + + if err != nil { + glog.Errorf("PutObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + + // Check if this is a delete marker + if entry.Extended != nil { + if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + } else { + // Handle regular (non-versioned) object ACL modification + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err = s3a.getEntry(bucketDir, object) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + } + if entry == nil { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return @@ -208,14 +298,44 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque // Store ACL in object metadata if errCode := AssembleEntryWithAcp(entry, objectOwner, grants); errCode != s3err.ErrNone { glog.Errorf("PutObjectAclHandler: failed to assemble entry with ACP: %v", errCode) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + s3err.WriteErrorResponse(w, r, errCode) return } + // Calculate the correct directory for ACL update + var updateDirectory string + + if versioningConfigured { + if versionId != "" && versionId != "null" { + // Versioned object - update the specific version file in .versions directory + updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions" + } else { + // Latest version in versioned bucket - could be null version or versioned object + // Extract version ID from the entry to determine where it's stored + var actualVersionId string + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + actualVersionId = string(versionIdBytes) + } + } + + if actualVersionId == "null" || actualVersionId == "" { + // Null version (pre-versioning object) - stored as regular file + updateDirectory = s3a.option.BucketsPath + "/" + bucket + } else { + // Versioned object - stored in .versions directory + updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions" + } + } + } else { + // Non-versioned object - stored as regular file + updateDirectory = s3a.option.BucketsPath + "/" + bucket + } + // Update the object with new ACL metadata err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: bucketDir, + Directory: updateDirectory, Entry: entry, } diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 8cb5c04fe..3a2544710 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -32,8 +32,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") - // Check if versioning is configured for the bucket (Enabled or Suspended) - versioningConfigured, err := s3a.isVersioningConfigured(bucket) + // Get detailed versioning state for proper handling of suspended vs enabled versioning + versioningState, err := s3a.getVersioningState(bucket) if err != nil { if err == filer_pb.ErrNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -44,14 +44,19 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } + versioningEnabled := (versioningState == s3_constants.VersioningEnabled) + versioningSuspended := (versioningState == s3_constants.VersioningSuspended) + versioningConfigured := (versioningState != "") + var auditLog *s3err.AccessLog if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } if versioningConfigured { - // Handle versioned delete + // Handle versioned delete based on specific versioning state if versionId != "" { + // Delete specific version (same for both enabled and suspended) // Check object lock permissions before deleting specific version governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil { @@ -71,19 +76,44 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque // Set version ID in response header w.Header().Set("x-amz-version-id", versionId) } else { - // Create delete marker (logical delete) - // AWS S3 behavior: Delete marker creation is NOT blocked by object retention - // because it's a logical delete that doesn't actually remove the retained version - deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) - if err != nil { - glog.Errorf("Failed to create delete marker: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Delete without version ID - behavior depends on versioning state + if versioningEnabled { + // Enabled versioning: Create delete marker (logical delete) + // AWS S3 behavior: Delete marker creation is NOT blocked by object retention + // because it's a logical delete that doesn't actually remove the retained version + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) + if err != nil { + glog.Errorf("Failed to create delete marker: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } - // Set delete marker version ID in response header - w.Header().Set("x-amz-version-id", deleteMarkerVersionId) - w.Header().Set("x-amz-delete-marker", "true") + // Set delete marker version ID in response header + w.Header().Set("x-amz-version-id", deleteMarkerVersionId) + w.Header().Set("x-amz-delete-marker", "true") + } else if versioningSuspended { + // Suspended versioning: Actually delete the "null" version object + glog.V(2).Infof("DeleteObjectHandler: deleting null version for suspended versioning %s/%s", bucket, object) + + // Check object lock permissions before deleting "null" version + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) + if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil { + glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } + + // Delete the "null" version (the regular file) + err := s3a.deleteSpecificObjectVersion(bucket, object, "null") + if err != nil { + glog.Errorf("Failed to delete null version: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // Note: According to AWS S3 spec, suspended versioning should NOT return version ID headers + // The object is deleted but no version information is returned + } } } else { // Handle regular delete (non-versioned) @@ -203,8 +233,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - // Check if versioning is configured for the bucket (needed for object lock checks) - versioningConfigured, err := s3a.isVersioningConfigured(bucket) + // Get detailed versioning state for proper handling of suspended vs enabled versioning + versioningState, err := s3a.getVersioningState(bucket) if err != nil { if err == filer_pb.ErrNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -215,6 +245,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h return } + versioningEnabled := (versioningState == s3_constants.VersioningEnabled) + versioningSuspended := (versioningState == s3_constants.VersioningSuspended) + versioningConfigured := (versioningState != "") + s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete file entries @@ -243,9 +277,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var isDeleteMarker bool if versioningConfigured { - // Handle versioned delete + // Handle versioned delete based on specific versioning state if object.VersionId != "" { - // Delete specific version + // Delete specific version (same for both enabled and suspended) err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId) if err != nil { deleteErrors = append(deleteErrors, DeleteError{ @@ -258,19 +292,39 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } deleteVersionId = object.VersionId } else { - // Create delete marker (logical delete) - deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key) - if err != nil { - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: object.VersionId, - }) - continue + // Delete without version ID - behavior depends on versioning state + if versioningEnabled { + // Enabled versioning: Create delete marker (logical delete) + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key) + if err != nil { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.Key, + VersionId: object.VersionId, + }) + continue + } + deleteVersionId = deleteMarkerVersionId + isDeleteMarker = true + } else if versioningSuspended { + // Suspended versioning: Actually delete the "null" version object + glog.V(2).Infof("DeleteMultipleObjectsHandler: deleting null version for suspended versioning %s/%s", bucket, object.Key) + + err := s3a.deleteSpecificObjectVersion(bucket, object.Key, "null") + if err != nil { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.Key, + VersionId: "null", + }) + continue + } + deleteVersionId = "null" + // Note: For suspended versioning, we don't set isDeleteMarker=true + // because we actually deleted the object, not created a delete marker } - deleteVersionId = deleteMarkerVersionId - isDeleteMarker = true } // Add to successful deletions with version info diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index bbb67d391..8a55db854 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -4,16 +4,17 @@ import ( "context" "encoding/xml" "fmt" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "io" "net/http" "net/url" "strconv" "strings" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" ) type OptionalString struct { @@ -356,6 +357,9 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d return } + // Track .versions directories found in this directory for later processing + var versionsDirs []string + for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -386,6 +390,14 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if entry.Name == s3_constants.MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys continue } + + // Skip .versions directories in regular list operations but track them for logical object creation + if strings.HasSuffix(entry.Name, ".versions") { + glog.V(4).Infof("Found .versions directory: %s", entry.Name) + versionsDirs = append(versionsDirs, entry.Name) + continue + } + if delimiter != "/" || cursor.prefixEndsOnDelimiter { if cursor.prefixEndsOnDelimiter { cursor.prefixEndsOnDelimiter = false @@ -425,6 +437,48 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d cursor.prefixEndsOnDelimiter = false } } + + // After processing all regular entries, handle versioned objects + // Create logical entries for objects that have .versions directories + for _, versionsDir := range versionsDirs { + if cursor.maxKeys <= 0 { + cursor.isTruncated = true + break + } + + // Extract object name from .versions directory name (remove .versions suffix) + baseObjectName := strings.TrimSuffix(versionsDir, ".versions") + + // Construct full object path relative to bucket + // dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config" + // we need to get the path relative to bucket: "Veeam/Backup/vbr/Config/Owner" + bucketPath := strings.TrimPrefix(dir, s3a.option.BucketsPath+"/") + bucketName := strings.Split(bucketPath, "/")[0] + + // Remove bucket name from path to get directory within bucket + bucketRelativePath := strings.Join(strings.Split(bucketPath, "/")[1:], "/") + + var fullObjectPath string + if bucketRelativePath == "" { + // Object is at bucket root + fullObjectPath = baseObjectName + } else { + // Object is in subdirectory + fullObjectPath = bucketRelativePath + "/" + baseObjectName + } + + glog.V(4).Infof("Processing versioned object: baseObjectName=%s, bucketRelativePath=%s, fullObjectPath=%s", + baseObjectName, bucketRelativePath, fullObjectPath) + + // Get the latest version information for this object + if latestVersionEntry, latestVersionErr := s3a.getLatestVersionEntryForListOperation(bucketName, fullObjectPath); latestVersionErr == nil { + glog.V(4).Infof("Creating logical entry for versioned object: %s", fullObjectPath) + eachEntryFn(dir, latestVersionEntry) + } else { + glog.V(4).Infof("Failed to get latest version for %s: %v", fullObjectPath, latestVersionErr) + } + } + return } @@ -513,3 +567,32 @@ func (s3a *S3ApiServer) ensureDirectoryAllEmpty(filerClient filer_pb.SeaweedFile return true, nil } + +// getLatestVersionEntryForListOperation gets the latest version of an object and creates a logical entry for list operations +// This is used to show versioned objects as logical object names in regular list operations +func (s3a *S3ApiServer) getLatestVersionEntryForListOperation(bucket, object string) (*filer_pb.Entry, error) { + // Get the latest version entry + latestVersionEntry, err := s3a.getLatestObjectVersion(bucket, object) + if err != nil { + return nil, fmt.Errorf("failed to get latest version: %w", err) + } + + // Check if this is a delete marker (should not be shown in regular list) + if latestVersionEntry.Extended != nil { + if deleteMarker, exists := latestVersionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + return nil, fmt.Errorf("latest version is a delete marker") + } + } + + // Create a logical entry that appears to be stored at the object path (not the versioned path) + // This allows the list operation to show the logical object name while preserving all metadata + logicalEntry := &filer_pb.Entry{ + Name: strings.TrimPrefix(object, "/"), + IsDirectory: false, + Attributes: latestVersionEntry.Attributes, + Extended: latestVersionEntry.Extended, + Chunks: latestVersionEntry.Chunks, + } + + return logicalEntry, nil +} diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 23069ef32..3d8a62b09 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -90,6 +90,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) entry.Content, _ = io.ReadAll(r.Body) } entry.Attributes.Mime = objectContentType + + // Set object owner for directory objects (same as regular objects) + s3a.setObjectOwnerFromRequest(r, entry) }); err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index ff82969b5..e6b6f975b 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -19,18 +19,31 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" ) -// ObjectVersion represents a version of an S3 object -type ObjectVersion struct { - VersionId string - IsLatest bool - IsDeleteMarker bool - LastModified time.Time - ETag string - Size int64 - Entry *filer_pb.Entry +// S3ListObjectVersionsResult - Custom struct for S3 list-object-versions response +// This avoids conflicts with the XSD generated ListVersionsResult struct +// and ensures proper separation of versions and delete markers into arrays +type S3ListObjectVersionsResult struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"` + + Name string `xml:"Name"` + Prefix string `xml:"Prefix,omitempty"` + KeyMarker string `xml:"KeyMarker,omitempty"` + VersionIdMarker string `xml:"VersionIdMarker,omitempty"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"` + MaxKeys int `xml:"MaxKeys"` + Delimiter string `xml:"Delimiter,omitempty"` + IsTruncated bool `xml:"IsTruncated"` + + // These are the critical fields - arrays instead of single elements + Versions []VersionEntry `xml:"Version,omitempty"` // Array for versions + DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"` // Array for delete markers + + CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` + EncodingType string `xml:"EncodingType,omitempty"` } -// ListObjectVersionsResult represents the response for ListObjectVersions +// Original struct - keeping for compatibility but will use S3ListObjectVersionsResult for XML response type ListObjectVersionsResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"` Name string `xml:"Name"` @@ -47,6 +60,17 @@ type ListObjectVersionsResult struct { CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` } +// ObjectVersion represents a version of an S3 object +type ObjectVersion struct { + VersionId string + IsLatest bool + IsDeleteMarker bool + LastModified time.Time + ETag string + Size int64 + Entry *filer_pb.Entry +} + // generateVersionId creates a unique version ID that preserves chronological order func generateVersionId() string { // Use nanosecond timestamp to ensure chronological ordering @@ -124,7 +148,7 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error } // listObjectVersions lists all versions of an object -func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) { +func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) { var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry // Track objects that have been processed to avoid duplicates @@ -184,8 +208,8 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM return versionIdI > versionIdJ }) - // Build result - result := &ListObjectVersionsResult{ + // Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs + result := &S3ListObjectVersionsResult{ Name: bucket, Prefix: prefix, KeyMarker: keyMarker, @@ -296,7 +320,35 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string } } } else { - // Recursively search subdirectories + // This is a regular directory - check if it's an explicit S3 directory object + // Only include directories that were explicitly created via S3 API (have FolderMimeType) + // This excludes implicit directories created when uploading files like "test1/a" + if entry.Attributes.Mime == s3_constants.FolderMimeType { + directoryKey := entryPath + if !strings.HasSuffix(directoryKey, "/") { + directoryKey += "/" + } + + // Add directory as a version entry with VersionId "null" (following S3/Minio behavior) + glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey) + + // Calculate ETag for empty directory + directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\"" + + versionEntry := &VersionEntry{ + Key: directoryKey, + VersionId: "null", + IsLatest: true, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + ETag: directoryETag, + Size: 0, // Directories have size 0 + Owner: s3a.getObjectOwnerFromEntry(entry), + StorageClass: "STANDARD", + } + *allVersions = append(*allVersions, versionEntry) + } + + // Recursively search subdirectories (regardless of whether they're explicit or implicit) fullPath := path.Join(currentPath, entry.Name) err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix) if err != nil { @@ -529,13 +581,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st versionsDir := s3a.getVersionedObjectDir(bucket, object) versionFile := s3a.getVersionFileName(versionId) - // Delete the specific version from .versions directory - _, err := s3a.getEntry(versionsDir, versionFile) - if err != nil { - return fmt.Errorf("version %s not found: %v", versionId, err) - } - - // Check if this is the latest version before deleting + // Check if this is the latest version before attempting deletion (for potential metadata update) versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions") isLatestVersion := false if dirErr == nil && versionsEntry.Extended != nil { @@ -544,15 +590,19 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st } } - // Delete the version file + // Attempt to delete the version file + // Note: We don't check if the file exists first to avoid race conditions + // The deletion operation should be idempotent deleteErr := s3a.rm(versionsDir, versionFile, true, false) if deleteErr != nil { - // Check if file was already deleted by another process + // Check if file was already deleted by another process (race condition handling) if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil { - // File doesn't exist anymore, deletion was successful - } else { - return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr) + // File doesn't exist anymore, deletion was successful (another thread deleted it) + glog.V(2).Infof("deleteSpecificObjectVersion: version %s for %s%s already deleted by another process", versionId, bucket, object) + return nil } + // File still exists but deletion failed for another reason + return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr) } // If we deleted the latest version, update the .versions directory metadata to point to the new latest