From 084b377f8786e3a4d98e0763c3e83be104a9b65e Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 6 Nov 2025 11:05:54 +0500 Subject: [PATCH] do delete expired entries on s3 list request (#7426) * do delete expired entries on s3 list request https://github.com/seaweedfs/seaweedfs/issues/6837 * disable delete expires s3 entry in filer * pass opt allowDeleteObjectsByTTL to all servers * delete on get and head * add lifecycle expiration s3 tests * fix opt allowDeleteObjectsByTTL for server * fix test lifecycle expiration * fix IsExpired * fix locationPrefix for updateEntriesTTL * fix s3tests * resolv coderabbitai * GetS3ExpireTime on filer * go mod * clear TtlSeconds for volume * move s3 delete expired entry to filer * filer delete meta and data * del unusing func removeExpiredObject * test s3 put * test s3 put multipart * allowDeleteObjectsByTTL by default * fix pipline tests * rm dublicate SeaweedFSExpiresS3 * revert expiration tests * fix updateTTL * rm log * resolv comment * fix delete version object * fix S3Versioning * fix delete on FindEntry * fix delete chunks * fix sqlite not support concurrent writes/reads * move deletion out of listing transaction; delete entries and empty folders * Revert "fix sqlite not support concurrent writes/reads" This reverts commit 5d5da14e0ed91c613fe5c0ed058f58bb04fba6f0. * clearer handling on recursive empty directory deletion * handle listing errors * strut copying * reuse code to delete empty folders * use iterative approach with a queue to avoid recursive WithFilerClient calls * stop a gRPC stream from the client-side callback is to return a specific error, e.g., io.EOF * still issue UpdateEntry when the flag must be added * errors join * join path * cleaner * add context, sort directories by depth (deepest first) to avoid redundant checks * batched operation, refactoring * prevent deleting bucket * constant * reuse code * more logging * refactoring * s3 TTL time * Safety check --------- Co-authored-by: chrislu --- .github/workflows/s3tests.yml | 14 +- test/kafka/go.mod | 27 ++-- test/kafka/go.sum | 52 +++---- weed/filer/entry.go | 24 ++++ weed/filer/filer.go | 133 +++++++++++++++++- weed/pb/filer_pb/filer_client.go | 56 ++++++++ weed/pb/filer_pb/filer_pb_helper.go | 26 ++++ weed/s3api/filer_multipart.go | 19 ++- weed/s3api/filer_util.go | 107 ++++++++++++++ weed/s3api/s3_constants/extend_key.go | 1 + weed/s3api/s3_constants/header.go | 1 + weed/s3api/s3api_bucket_handlers.go | 12 +- weed/s3api/s3api_object_handlers.go | 2 - weed/s3api/s3api_object_handlers_delete.go | 89 ++++++------ weed/s3api/s3api_object_handlers_put.go | 3 +- .../filer_server_handlers_write_autochunk.go | 15 +- .../constants_lifecycle_interval_10sec.go | 8 ++ weed/util/constants_lifecycle_interval_day.go | 8 ++ 18 files changed, 489 insertions(+), 108 deletions(-) create mode 100644 weed/util/constants_lifecycle_interval_10sec.go create mode 100644 weed/util/constants_lifecycle_interval_day.go diff --git a/.github/workflows/s3tests.yml b/.github/workflows/s3tests.yml index 540247a34..77b70426f 100644 --- a/.github/workflows/s3tests.yml +++ b/.github/workflows/s3tests.yml @@ -54,7 +54,7 @@ jobs: shell: bash run: | cd weed - go install -buildvcs=false + go install -tags s3tests -buildvcs=false set -x # Create clean data directory for this test run export WEED_DATA_DIR="/tmp/seaweedfs-s3tests-$(date +%s)" @@ -308,7 +308,10 @@ jobs: s3tests/functional/test_s3.py::test_copy_object_ifnonematch_good \ s3tests/functional/test_s3.py::test_lifecycle_set \ s3tests/functional/test_s3.py::test_lifecycle_get \ - s3tests/functional/test_s3.py::test_lifecycle_set_filter + s3tests/functional/test_s3.py::test_lifecycle_set_filter \ + s3tests/functional/test_s3.py::test_lifecycle_expiration \ + s3tests/functional/test_s3.py::test_lifecyclev2_expiration \ + s3tests/functional/test_s3.py::test_lifecycle_expiration_versioning_enabled kill -9 $pid || true # Clean up data directory rm -rf "$WEED_DATA_DIR" || true @@ -791,7 +794,7 @@ jobs: exit 1 fi - go install -tags "sqlite" -buildvcs=false + go install -tags "sqlite s3tests" -buildvcs=false # Create clean data directory for this test run with unique timestamp and process ID export WEED_DATA_DIR="/tmp/seaweedfs-sql-test-$(date +%s)-$$" mkdir -p "$WEED_DATA_DIR" @@ -1123,7 +1126,10 @@ jobs: s3tests/functional/test_s3.py::test_copy_object_ifnonematch_good \ s3tests/functional/test_s3.py::test_lifecycle_set \ s3tests/functional/test_s3.py::test_lifecycle_get \ - s3tests/functional/test_s3.py::test_lifecycle_set_filter + s3tests/functional/test_s3.py::test_lifecycle_set_filter \ + s3tests/functional/test_s3.py::test_lifecycle_expiration \ + s3tests/functional/test_s3.py::test_lifecyclev2_expiration \ + s3tests/functional/test_s3.py::test_lifecycle_expiration_versioning_enabled kill -9 $pid || true # Clean up data directory rm -rf "$WEED_DATA_DIR" || true diff --git a/test/kafka/go.mod b/test/kafka/go.mod index 02f6d6999..593b5f3f5 100644 --- a/test/kafka/go.mod +++ b/test/kafka/go.mod @@ -43,25 +43,25 @@ require ( github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.55.8 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.2 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.4 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 // indirect github.com/aws/aws-sdk-go-v2/config v1.31.3 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.18.10 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.6 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.19 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.4 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.9 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.9 // indirect github.com/aws/aws-sdk-go-v2/service/s3 v1.88.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.29.1 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.38.2 // indirect - github.com/aws/smithy-go v1.23.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.29.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.9 // indirect + github.com/aws/smithy-go v1.23.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradenaw/juniper v0.15.3 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect @@ -117,6 +117,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/schema v1.4.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -177,7 +178,7 @@ require ( github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.19.1 // indirect github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8 // indirect - github.com/rclone/rclone v1.71.1 // indirect + github.com/rclone/rclone v1.71.2 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/rdleal/intervalst v1.5.0 // indirect github.com/relvacode/iso8601 v1.6.0 // indirect @@ -202,7 +203,7 @@ require ( github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 // indirect - github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 // indirect + github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c // indirect github.com/tklauser/go-sysconf v0.3.15 // indirect github.com/tklauser/numcpus v0.10.0 // indirect github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 // indirect diff --git a/test/kafka/go.sum b/test/kafka/go.sum index 12ba88daa..85f45b85a 100644 --- a/test/kafka/go.sum +++ b/test/kafka/go.sum @@ -102,44 +102,44 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= -github.com/aws/aws-sdk-go-v2 v1.39.2 h1:EJLg8IdbzgeD7xgvZ+I8M1e0fL0ptn/M47lianzth0I= -github.com/aws/aws-sdk-go-v2 v1.39.2/go.mod h1:sDioUELIUO9Znk23YVmIk86/9DOpkbyyVb1i/gUNFXY= +github.com/aws/aws-sdk-go-v2 v1.39.4 h1:qTsQKcdQPHnfGYBBs+Btl8QwxJeoWcOcPcixK90mRhg= +github.com/aws/aws-sdk-go-v2 v1.39.4/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 h1:i8p8P4diljCr60PpJp6qZXNlgX4m2yQFpYk+9ZT+J4E= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1/go.mod h1:ddqbooRZYNoJ2dsTwOty16rM+/Aqmk/GOXrK8cg7V00= github.com/aws/aws-sdk-go-v2/config v1.31.3 h1:RIb3yr/+PZ18YYNe6MDiG/3jVoJrPmdoCARwNkMGvco= github.com/aws/aws-sdk-go-v2/config v1.31.3/go.mod h1:jjgx1n7x0FAKl6TnakqrpkHWWKcX3xfWtdnIJs5K9CE= -github.com/aws/aws-sdk-go-v2/credentials v1.18.10 h1:xdJnXCouCx8Y0NncgoptztUocIYLKeQxrCgN6x9sdhg= -github.com/aws/aws-sdk-go-v2/credentials v1.18.10/go.mod h1:7tQk08ntj914F/5i9jC4+2HQTAuJirq7m1vZVIhEkWs= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.6 h1:wbjnrrMnKew78/juW7I2BtKQwa1qlf6EjQgS69uYY14= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.6/go.mod h1:AtiqqNrDioJXuUgz3+3T0mBWN7Hro2n9wll2zRUc0ww= +github.com/aws/aws-sdk-go-v2/credentials v1.18.19 h1:Jc1zzwkSY1QbkEcLujwqRTXOdvW8ppND3jRBb/VhBQc= +github.com/aws/aws-sdk-go-v2/credentials v1.18.19/go.mod h1:DIfQ9fAk5H0pGtnqfqkbSIzky82qYnGvh06ASQXXg6A= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11 h1:X7X4YKb+c0rkI6d4uJ5tEMxXgCZ+jZ/D6mvkno8c8Uw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11/go.mod h1:EqM6vPZQsZHYvC4Cai35UDg/f5NCEU+vp0WfbVqVcZc= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.4 h1:0SzCLoPRSK3qSydsaFQWugP+lOBCTPwfcBOm6222+UA= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.4/go.mod h1:JAet9FsBHjfdI+TnMBX4ModNNaQHAd3dc/Bk+cNsxeM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.9 h1:se2vOWGD3dWQUtfn4wEjRQJb1HK1XsNIt825gskZ970= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.9/go.mod h1:hijCGH2VfbZQxqCDN7bwz/4dzxV+hkyhjawAtdPWKZA= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.9 h1:6RBnKZLkJM4hQ+kN6E7yWFveOTg8NLPHAkqrs4ZPlTU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.9/go.mod h1:V9rQKRmK7AWuEsOMnHzKj8WyrIir1yUJbZxDuZLFvXI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11 h1:7AANQZkF3ihM8fbdftpjhken0TP9sBzFbV/Ze/Y4HXA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11/go.mod h1:NTF4QCGkm6fzVwncpkFQqoquQyOolcyXfbpC98urj+c= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11 h1:ShdtWUZT37LCAA4Mw2kJAJtzaszfSHFb5n25sdcv4YE= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11/go.mod h1:7bUb2sSr2MZ3M/N+VyETLTQtInemHXb/Fl3s8CLzm0Y= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.9 h1:w9LnHqTq8MEdlnyhV4Bwfizd65lfNCNgdlNC6mM5paE= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.9/go.mod h1:LGEP6EK4nj+bwWNdrvX/FnDTFowdBNwcSPuZu/ouFys= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 h1:oegbebPEMA/1Jny7kvwejowCaHz1FWZAQ94WXFNCyTM= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1/go.mod h1:kemo5Myr9ac0U9JfSjMo9yHLtw+pECEHsFtJ9tqCEI8= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.9 h1:by3nYZLR9l8bUH7kgaMU4dJgYFjyRdFEfORlDpPILB4= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.9/go.mod h1:IWjQYlqw4EX9jw2g3qnEPPWvCE6bS8fKzhMed1OK7c8= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.9 h1:5r34CgVOD4WZudeEKZ9/iKpiT6cM1JyEROpXjOcdWv8= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.9/go.mod h1:dB12CEbNWPbzO2uC6QSWHteqOg4JfBVJOojbAoAUb5I= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11 h1:GpMf3z2KJa4RnJ0ew3Hac+hRFYLZ9DDjfgXjuW+pB54= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11/go.mod h1:6MZP3ZI4QQsgUCFTwMZA2V0sEriNQ8k2hmoHF3qjimQ= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.9 h1:wuZ5uW2uhJR63zwNlqWH2W4aL4ZjeJP3o92/W+odDY4= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.9/go.mod h1:/G58M2fGszCrOzvJUkDdY8O9kycodunH4VdT5oBAqls= github.com/aws/aws-sdk-go-v2/service/s3 v1.88.3 h1:P18I4ipbk+b/3dZNq5YYh+Hq6XC0vp5RWkLp1tJldDA= github.com/aws/aws-sdk-go-v2/service/s3 v1.88.3/go.mod h1:Rm3gw2Jov6e6kDuamDvyIlZJDMYk97VeCZ82wz/mVZ0= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.1 h1:8OLZnVJPvjnrxEwHFg9hVUof/P4sibH+Ea4KKuqAGSg= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.1/go.mod h1:27M3BpVi0C02UiQh1w9nsBEit6pLhlaH3NHna6WUbDE= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.2 h1:gKWSTnqudpo8dAxqBqZnDoDWCiEh/40FziUjr/mo6uA= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.2/go.mod h1:x7+rkNmRoEN1U13A6JE2fXne9EWyJy54o3n6d4mGaXQ= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.2 h1:YZPjhyaGzhDQEvsffDEcpycq49nl7fiGcfJTIo8BszI= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.2/go.mod h1:2dIN8qhQfv37BdUYGgEC8Q3tteM3zFxTI1MLO2O3J3c= -github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE= -github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.8 h1:M5nimZmugcZUO9wG7iVtROxPhiqyZX6ejS1lxlDPbTU= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.8/go.mod h1:mbef/pgKhtKRwrigPPs7SSSKZgytzP8PQ6P6JAAdqyM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3 h1:S5GuJZpYxE0lKeMHKn+BRTz6PTFpgThyJ+5mYfux7BM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3/go.mod h1:X4OF+BTd7HIb3L+tc4UlWHVrpgwZZIVENU15pRDVTI0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.9 h1:Ekml5vGg6sHSZLZJQJagefnVe6PmqC2oiRkBq4F7fU0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.9/go.mod h1:/e15V+o1zFHWdH3u7lpI3rVBcxszktIKuHKCY2/py+k= +github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= +github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradenaw/juniper v0.15.3 h1:RHIAMEDTpvmzV1wg1jMAHGOoI2oJUSPx3lxRldXnFGo= @@ -540,8 +540,8 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/quic-go v0.54.1 h1:4ZAWm0AhCb6+hE+l5Q1NAL0iRn/ZrMwqHRGQiFwj2eg= github.com/quic-go/quic-go v0.54.1/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= -github.com/rclone/rclone v1.71.1 h1:cpODfWTRz5i/WAzXsyW85tzfIKNsd1aq8CE8lUB+0zg= -github.com/rclone/rclone v1.71.1/go.mod h1:NLyX57FrnZ9nVLTY5TRdMmGelrGKbIRYGcgRkNdqqlA= +github.com/rclone/rclone v1.71.2 h1:3Jk5xNPFrZhVABRuN/OPvApuZQddpE2tkhYMuEn1Ud4= +github.com/rclone/rclone v1.71.2/go.mod h1:dCK9FzPDlpkbQJ9M7MmWsmv3X5nibfWe+ogJXu6gSgM= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rdleal/intervalst v1.5.0 h1:SEB9bCFz5IqD1yhfH1Wv8IBnY/JQxDplwkxHjT6hamU= @@ -621,8 +621,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= -github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 h1:Sa+sR8aaAMFwxhXWENEnE6ZpqhZ9d7u1RT2722Rw6hc= -github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5/go.mod h1:UdZiFUFu6e2WjjtjxivwXWcwc1N/8zgbkBR9QNucUOY= +github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c h1:BLopNCyqewbE8+BtlIp/Juzu8AJGxz0gHdGADnsblVc= +github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c/go.mod h1:ykucQyiE9Q2qx1wLlEtZkkNn1IURib/2O+Mvd25i1Fo= github.com/tailscale/depaware v0.0.0-20210622194025-720c4b409502/go.mod h1:p9lPsd+cx33L3H9nNoecRRxPssFKUwwI50I3pZ0yT+8= github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 5bd1a3c56..4757d5c9e 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -1,6 +1,7 @@ package filer import ( + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "os" "time" @@ -143,3 +144,26 @@ func maxUint64(x, y uint64) uint64 { } return y } + +func (entry *Entry) IsExpireS3Enabled() (exist bool) { + if entry.Extended != nil { + _, exist = entry.Extended[s3_constants.SeaweedFSExpiresS3] + } + return exist +} + +func (entry *Entry) IsS3Versioning() (exist bool) { + if entry.Extended != nil { + _, exist = entry.Extended[s3_constants.ExtVersionIdKey] + } + return exist +} + +func (entry *Entry) GetS3ExpireTime() (expireTime time.Time) { + if entry.Mtime.IsZero() { + expireTime = entry.Crtime + } else { + expireTime = entry.Mtime + } + return expireTime.Add(time.Duration(entry.TtlSec) * time.Second) +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index b86ac3c5b..d3d2de948 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -351,37 +351,162 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } entry, err = f.Store.FindEntry(ctx, p) if entry != nil && entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + if entry.IsExpireS3Enabled() { + if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() { + if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil { + glog.ErrorfCtx(ctx, "FindEntry doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr) + } + return nil, filer_pb.ErrNotFound + } + } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { f.Store.DeleteOneEntry(ctx, entry) return nil, filer_pb.ErrNotFound } } - return + return entry, err } func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { + // Collect expired entries during iteration to avoid deadlock with DB connection pool + var expiredEntries []*Entry + var s3ExpiredEntries []*Entry + var hasValidEntries bool + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { select { case <-ctx.Done(): return false default: if entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteOneEntry(ctx, entry) + if entry.IsExpireS3Enabled() { + if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() { + // Collect for deletion after iteration completes to avoid DB deadlock + s3ExpiredEntries = append(s3ExpiredEntries, entry) + expiredCount++ + return true + } + } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + // Collect for deletion after iteration completes to avoid DB deadlock + expiredEntries = append(expiredEntries, entry) expiredCount++ return true } } + // Track that we found at least one valid (non-expired) entry + hasValidEntries = true return eachEntryFunc(entry) } }) if err != nil { return expiredCount, lastFileName, err } + + // Delete expired entries after iteration completes to avoid DB connection deadlock + if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 { + for _, entry := range s3ExpiredEntries { + if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil { + glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr) + } + } + for _, entry := range expiredEntries { + if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil { + glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr) + } + } + + // After expiring entries, the directory might be empty. + // Attempt to clean it up and any empty parent directories. + if !hasValidEntries && p != "/" && startFileName == "" { + stopAtPath := util.FullPath(f.DirBucketsPath) + f.DeleteEmptyParentDirectories(ctx, p, stopAtPath) + } + } + return } +// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty. +// It stops at root "/" or at stopAtPath (if provided). +// This is useful for cleaning up directories after deleting files or expired entries. +// +// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided). +// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths). +// +// Example usage: +// +// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories +// // but stop at the bucket path +// parentPath := util.FullPath("/bucket/dir/subdir") +// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket")) +// +// Example with gRPC client: +// +// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { +// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error { +// // Process entries... +// }) +// }); err == nil { +// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath) +// } +func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) { + if dirPath == "/" || dirPath == stopAtPath { + return + } + + // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything) + stopStr := string(stopAtPath) + if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(string(dirPath)+"/", stopStr+"/") { + glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + return + } + + // Additional safety: prevent deletion of bucket-level directories + // This protects /buckets/mybucket from being deleted even if empty + baseDepth := strings.Count(f.DirBucketsPath, "/") + dirDepth := strings.Count(string(dirPath), "/") + if dirDepth <= baseDepth+1 { + glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: skipping deletion of bucket-level directory %s", dirPath) + return + } + + // Check if directory is empty + isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath) + if err != nil { + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err) + return + } + + if !isEmpty { + // Directory is not empty, stop checking upward + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) + return + } + + // Directory is empty, try to delete it + glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath) + parentDir, _ := dirPath.DirAndName() + if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil { + if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil { + // Successfully deleted, continue checking upwards + f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath) + } else { + // Failed to delete, stop cleanup + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr) + } + } +} + +// IsDirectoryEmpty checks if a directory contains any entries +func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { + isEmpty := true + _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool { + isEmpty = false + return false // Stop after first entry + }) + return isEmpty, err +} + func (f *Filer) Shutdown() { close(f.deletionQuit) f.LocalMetaLogBuffer.ShutdownLogBuffer() diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 80adab292..17953c67d 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -308,3 +308,59 @@ func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPat return nil } + +// DoDeleteEmptyParentDirectories recursively deletes empty parent directories. +// It stops at root "/" or at stopAtPath. +// For safety, dirPath must be under stopAtPath (when stopAtPath is provided). +// The checked map tracks already-processed directories to avoid redundant work in batch operations. +func DoDeleteEmptyParentDirectories(ctx context.Context, client SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath, checked map[string]bool) { + if dirPath == "/" || dirPath == stopAtPath { + return + } + + // Skip if already checked (for batch delete optimization) + dirPathStr := string(dirPath) + if checked != nil { + if checked[dirPathStr] { + return + } + checked[dirPathStr] = true + } + + // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything) + stopStr := string(stopAtPath) + if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(dirPathStr+"/", stopStr+"/") { + glog.V(1).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + return + } + + // Check if directory is empty by listing with limit 1 + isEmpty := true + err := SeaweedList(ctx, client, dirPathStr, "", func(entry *Entry, isLast bool) error { + isEmpty = false + return io.EOF // Use sentinel error to explicitly stop iteration + }, "", false, 1) + + if err != nil && err != io.EOF { + glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: error checking %s: %v", dirPath, err) + return + } + + if !isEmpty { + // Directory is not empty, stop checking upward + glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) + return + } + + // Directory is empty, try to delete it + glog.V(2).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: deleting empty directory %s", dirPath) + parentDir, dirName := dirPath.DirAndName() + + if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil); err == nil { + // Successfully deleted, continue checking upwards + DoDeleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked) + } else { + // Failed to delete, stop cleanup + glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err) + } +} diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go index b5fd4e1e0..c8dd19d59 100644 --- a/weed/pb/filer_pb/filer_pb_helper.go +++ b/weed/pb/filer_pb/filer_pb_helper.go @@ -9,6 +9,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/viant/ptrie" "google.golang.org/protobuf/proto" @@ -24,6 +25,31 @@ func (entry *Entry) IsDirectoryKeyObject() bool { return entry.IsDirectory && entry.Attributes != nil && entry.Attributes.Mime != "" } +func (entry *Entry) GetExpiryTime() (expiryTime int64) { + // For S3 objects with lifecycle expiration, use Mtime (modification time) + // For regular TTL entries, use Crtime (creation time) for backward compatibility + if entry.Extended != nil { + if _, hasS3Expiry := entry.Extended[s3_constants.SeaweedFSExpiresS3]; hasS3Expiry { + // S3 lifecycle expiration: base TTL on modification time + expiryTime = entry.Attributes.Mtime + if expiryTime == 0 { + expiryTime = entry.Attributes.Crtime + } + expiryTime += int64(entry.Attributes.TtlSec) + return expiryTime + } + } + + // Regular TTL expiration: base on creation time only + expiryTime = entry.Attributes.Crtime + int64(entry.Attributes.TtlSec) + return expiryTime +} + +func (entry *Entry) IsExpired() bool { + return entry != nil && entry.Attributes != nil && entry.Attributes.TtlSec > 0 && + time.Now().Unix() >= entry.GetExpiryTime() +} + func (entry *Entry) FileMode() (fileMode os.FileMode) { if entry != nil && entry.Attributes != nil { fileMode = os.FileMode(entry.Attributes.FileMode) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index cb4c73692..c4c07f0c7 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -55,8 +55,7 @@ func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateM if entry.Extended == nil { entry.Extended = make(map[string][]byte) } - entry.Extended["key"] = []byte(*input.Key) - + entry.Extended[s3_constants.ExtMultipartObjectKey] = []byte(*input.Key) // Set object owner for multipart upload amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { @@ -173,6 +172,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl deleteEntries := []*filer_pb.Entry{} partEntries := make(map[int][]*filer_pb.Entry, len(entries)) entityTooSmall := false + entityWithTtl := false for _, entry := range entries { foundEntry := false glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) @@ -212,6 +212,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl foundEntry = true } if foundEntry { + if !entityWithTtl && entry.Attributes != nil && entry.Attributes.TtlSec > 0 { + entityWithTtl = true + } if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] && entry.Attributes.FileSize < multiPartMinSize { glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name) @@ -330,7 +333,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } for k, v := range pentry.Extended { - if k != "key" { + if k != s3_constants.ExtMultipartObjectKey { versionEntry.Extended[k] = v } } @@ -392,7 +395,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } for k, v := range pentry.Extended { - if k != "key" { + if k != s3_constants.ExtMultipartObjectKey { entry.Extended[k] = v } } @@ -445,7 +448,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } for k, v := range pentry.Extended { - if k != "key" { + if k != s3_constants.ExtMultipartObjectKey { entry.Extended[k] = v } } @@ -468,6 +471,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Attributes.Mime = mime } entry.Attributes.FileSize = uint64(offset) + // Set TTL-based S3 expiry (modification time) + if entityWithTtl { + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + } }) if err != nil { @@ -587,7 +594,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput uploadsCount := int64(0) for _, entry := range entries { if entry.Extended != nil { - key := string(entry.Extended["key"]) + key := string(entry.Extended[s3_constants.ExtMultipartObjectKey]) if *input.KeyMarker != "" && *input.KeyMarker != key { continue } diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 9dd9a684e..ef7396996 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -2,11 +2,14 @@ package s3api import ( "context" + "errors" "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -108,6 +111,110 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_ return err } +func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error { + // Use iterative approach with a queue to avoid recursive WithFilerClient calls + // which would create a new connection for each subdirectory + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + var updateErrors []error + dirsToProcess := []string{parentDirectoryPath} + + for len(dirsToProcess) > 0 { + dir := dirsToProcess[0] + dirsToProcess = dirsToProcess[1:] + + // Process directory in paginated batches + if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil { + updateErrors = append(updateErrors, err) + } + } + + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + return nil + }) +} + +// processDirectoryTTL processes a single directory in paginated batches +func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { + + const batchSize = filer.PaginationSize + startFrom := "" + + for { + lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors) + if err != nil { + return fmt.Errorf("list entries in %s: %w", dir, err) + } + + // If we got fewer entries than batch size, we've reached the end + if entryCount < batchSize { + break + } + startFrom = lastEntryName + } + return nil +} + +// processTTLBatch processes a single batch of entries +func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, startFrom string, batchSize uint32, + dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) { + + err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + lastEntry = entry.Name + count++ + + if entry.IsDirectory { + *dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name))) + return nil + } + + // Update entry TTL and S3 expiry flag + if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil { + *updateErrors = append(*updateErrors, updateErr) + } + return nil + }, startFrom, false, batchSize) + + return lastEntry, count, err +} + +// updateEntryTTL updates a single entry's TTL and S3 expiry flag +func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, entry *filer_pb.Entry, ttlSec int32) error { + + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + // Check if both TTL and S3 expiry flag are already set correctly + flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" + if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { + return nil // Already up to date + } + + // Set the S3 expiry flag + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + // Update TTL if needed + if entry.Attributes.TtlSec != ttlSec { + entry.Attributes.TtlSec = ttlSec + } + + if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }); err != nil { + return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err) + } + return nil +} + func (s3a *S3ApiServer) getCollectionName(bucket string) string { if s3a.option.FilerGroup != "" { return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go index f0f223a45..d57798341 100644 --- a/weed/s3api/s3_constants/extend_key.go +++ b/weed/s3api/s3_constants/extend_key.go @@ -11,6 +11,7 @@ const ( ExtETagKey = "Seaweed-X-Amz-ETag" ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id" ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name" + ExtMultipartObjectKey = "key" // Bucket Policy ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy" diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 82a270111..77ed310d9 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -42,6 +42,7 @@ const ( SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key" SeaweedFSPartNumber = "X-Seaweedfs-Part-Number" SeaweedFSUploadId = "X-Seaweedfs-Upload-Id" + SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3" // S3 ACL headers AmzCannedAcl = "X-Amz-Acl" diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index ead77041e..9509219d9 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -7,6 +7,7 @@ import ( "encoding/xml" "errors" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "math" "net/http" "path" @@ -792,9 +793,9 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr if rule.Expiration.Days == 0 { continue } - + locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix) locConf := &filer_pb.FilerConf_PathConf{ - LocationPrefix: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix), + LocationPrefix: locationPrefix, Collection: collectionName, Ttl: fmt.Sprintf("%dd", rule.Expiration.Days), } @@ -806,6 +807,13 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } + ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds()) + glog.V(2).Infof("Start updating TTL for %s", locationPrefix) + if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil { + glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr) + } else { + glog.V(2).Infof("Finished updating TTL for %s", locationPrefix) + } changed = true } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 163633e22..8917393be 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -375,7 +375,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Restore the original Range header for SSE processing if sseObject && originalRangeHeader != "" { r.Header.Set("Range", originalRangeHeader) - } // Add SSE metadata headers based on object metadata before SSE processing @@ -603,7 +602,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des resp.Body.Close() return } - setUserMetadataKeyToLowercase(resp) responseStatusCode, bytesTransferred := responseFn(resp, w) diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 3a2544710..f779a6edc 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "encoding/xml" "fmt" "io" @@ -8,14 +9,11 @@ import ( "slices" "strings" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/filer" - - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -129,22 +127,19 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use operation context that won't be cancelled if request terminates + // This ensures deletion completes atomically to avoid inconsistent state + opCtx := context.WithoutCancel(r.Context()) if err := doDeleteEntry(client, dir, name, true, false); err != nil { return err } - if s3a.option.AllowEmptyFolder { - return nil - } - - directoriesWithDeletion := make(map[string]int) - if strings.LastIndex(object, "/") > 0 { - directoriesWithDeletion[dir]++ - // purge empty folders, only checking folders with deletions - for len(directoriesWithDeletion) > 0 { - directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) - } + // Cleanup empty directories + if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 { + bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) + // Recursively delete empty parent directories, stop at bucket path + filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil) } return nil @@ -227,7 +222,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var deleteErrors []DeleteError var auditLog *s3err.AccessLog - directoriesWithDeletion := make(map[string]int) + directoriesWithDeletion := make(map[string]bool) if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) @@ -250,6 +245,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h versioningConfigured := (versioningState != "") s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use operation context that won't be cancelled if request terminates + // This ensures batch deletion completes atomically to avoid inconsistent state + opCtx := context.WithoutCancel(r.Context()) // delete file entries for _, object := range deleteObjects.Objects { @@ -359,12 +357,14 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { - directoriesWithDeletion[parentDirectoryPath]++ + // Track directory for empty directory cleanup + if !s3a.option.AllowEmptyFolder { + directoriesWithDeletion[parentDirectoryPath] = true + } deletedObjects = append(deletedObjects, object) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) } else { - delete(directoriesWithDeletion, parentDirectoryPath) deleteErrors = append(deleteErrors, DeleteError{ Code: "", Message: err.Error(), @@ -380,13 +380,29 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } - if s3a.option.AllowEmptyFolder { - return nil - } + // Cleanup empty directories - optimize by processing deepest first + if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 { + bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) - // purge empty folders, only checking folders with deletions - for len(directoriesWithDeletion) > 0 { - directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) + // Collect and sort directories by depth (deepest first) to avoid redundant checks + var allDirs []string + for dirPath := range directoriesWithDeletion { + allDirs = append(allDirs, dirPath) + } + // Sort by depth (deeper directories first) + slices.SortFunc(allDirs, func(a, b string) int { + return strings.Count(b, "/") - strings.Count(a, "/") + }) + + // Track already-checked directories to avoid redundant work + checked := make(map[string]bool) + for _, dirPath := range allDirs { + if !checked[dirPath] { + // Recursively delete empty parent directories, stop at bucket path + // Mark this directory and all its parents as checked during recursion + filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked) + } + } } return nil @@ -403,26 +419,3 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h writeSuccessResponseXML(w, r, deleteResp) } - -func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) { - var allDirs []string - for dir := range directoriesWithDeletion { - allDirs = append(allDirs, dir) - } - slices.SortFunc(allDirs, func(a, b string) int { - return len(b) - len(a) - }) - newDirectoriesWithDeletion = make(map[string]int) - for _, dir := range allDirs { - parentDir, dirName := util.FullPath(dir).DirAndName() - if parentDir == s3a.option.BucketsPath { - continue - } - if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil { - glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err) - } else { - newDirectoriesWithDeletion[parentDir]++ - } - } - return -} diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 148df89f6..0d07c548e 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -333,7 +333,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) } - + // Set TTL-based S3 expiry (modification time) + proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") // ensure that the Authorization header is overriding any previous // Authorization header which might be already present in proxyReq s3a.maybeAddFilerJwtAuthorization(proxyReq, true) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d2b3d8b52..fba693f43 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -136,8 +136,17 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter if err := fs.checkPermissions(ctx, r, fileName); err != nil { return nil, nil, err } + // Disable TTL-based (creation time) deletion when S3 expiry (modification time) is enabled + soMaybeWithOutTTL := so + if so.TtlSeconds > 0 { + if s3ExpiresValue := r.Header.Get(s3_constants.SeaweedFSExpiresS3); s3ExpiresValue == "true" { + clone := *so + clone.TtlSeconds = 0 + soMaybeWithOutTTL = &clone + } + } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, soMaybeWithOutTTL) if err != nil { return nil, nil, err @@ -330,7 +339,9 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } entry.Extended = SaveAmzMetaData(r, entry.Extended, false) - + if entry.TtlSec > 0 && r.Header.Get(s3_constants.SeaweedFSExpiresS3) == "true" { + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + } for k, v := range r.Header { if len(v) > 0 && len(v[0]) > 0 { if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { diff --git a/weed/util/constants_lifecycle_interval_10sec.go b/weed/util/constants_lifecycle_interval_10sec.go new file mode 100644 index 000000000..60f19c316 --- /dev/null +++ b/weed/util/constants_lifecycle_interval_10sec.go @@ -0,0 +1,8 @@ +//go:build s3tests +// +build s3tests + +package util + +import "time" + +const LifeCycleInterval = 10 * time.Second diff --git a/weed/util/constants_lifecycle_interval_day.go b/weed/util/constants_lifecycle_interval_day.go new file mode 100644 index 000000000..e2465ad5f --- /dev/null +++ b/weed/util/constants_lifecycle_interval_day.go @@ -0,0 +1,8 @@ +//go:build !s3tests +// +build !s3tests + +package util + +import "time" + +const LifeCycleInterval = 24 * time.Hour