mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-06-28 12:08:29 +08:00
fix: ydb filer bugs (#6778)
Some checks failed
go: build dev binaries / cleanup (push) Has been cancelled
docker: build dev containers / build-dev-containers (push) Has been cancelled
End to End / FUSE Mount (push) Has been cancelled
go: build binary / Build (push) Has been cancelled
Ceph S3 tests / Ceph S3 tests (push) Has been cancelled
test s3 over https using aws-cli / awscli-tests (push) Has been cancelled
go: build dev binaries / build_dev_linux_windows (amd64, linux) (push) Has been cancelled
go: build dev binaries / build_dev_linux_windows (amd64, windows) (push) Has been cancelled
go: build dev binaries / build_dev_darwin (amd64, darwin) (push) Has been cancelled
go: build dev binaries / build_dev_darwin (arm64, darwin) (push) Has been cancelled
Some checks failed
go: build dev binaries / cleanup (push) Has been cancelled
docker: build dev containers / build-dev-containers (push) Has been cancelled
End to End / FUSE Mount (push) Has been cancelled
go: build binary / Build (push) Has been cancelled
Ceph S3 tests / Ceph S3 tests (push) Has been cancelled
test s3 over https using aws-cli / awscli-tests (push) Has been cancelled
go: build dev binaries / build_dev_linux_windows (amd64, linux) (push) Has been cancelled
go: build dev binaries / build_dev_linux_windows (amd64, windows) (push) Has been cancelled
go: build dev binaries / build_dev_darwin (amd64, darwin) (push) Has been cancelled
go: build dev binaries / build_dev_darwin (arm64, darwin) (push) Has been cancelled
* fix: ydb filer bugs * fix(ydb): correct DeleteEntry log argument types * fix(ydb): bucket creation & deletion logic
This commit is contained in:
parent
9c1048bacb
commit
45964c2f86
@ -6,6 +6,12 @@ package ydb
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@ -13,16 +19,10 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -97,12 +97,9 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
|
||||
}
|
||||
|
||||
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
|
||||
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
|
||||
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
|
||||
}
|
||||
|
||||
if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
|
||||
glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
|
||||
if err := store.ensureTables(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -121,7 +118,9 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl
|
||||
return fmt.Errorf("execute statement: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
},
|
||||
table.WithIdempotent(),
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@ -204,6 +203,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath)
|
||||
dir, name := fullpath.DirAndName()
|
||||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
|
||||
query := withPragma(tablePathPrefix, deleteQuery)
|
||||
glog.V(4).Infof("DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir)
|
||||
queryParams := table.NewQueryParameters(
|
||||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
|
||||
table.ValueParam("$name", types.UTF8Value(name)))
|
||||
@ -212,7 +212,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath)
|
||||
}
|
||||
|
||||
func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
|
||||
dir, _ := fullpath.DirAndName()
|
||||
dir := string(fullpath)
|
||||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
|
||||
query := withPragma(tablePathPrefix, deleteFolderChildrenQuery)
|
||||
queryParams := table.NewQueryParameters(
|
||||
@ -246,12 +246,19 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
|
||||
}
|
||||
}
|
||||
restLimit := limit - entryCount
|
||||
const maxChunk = int64(1000)
|
||||
chunkLimit := restLimit
|
||||
if chunkLimit > maxChunk {
|
||||
chunkLimit = maxChunk
|
||||
}
|
||||
glog.V(4).Infof("startFileName %s, restLimit %d, chunkLimit %d", startFileName, restLimit, chunkLimit)
|
||||
|
||||
queryParams := table.NewQueryParameters(
|
||||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
|
||||
table.ValueParam("$directory", types.UTF8Value(*shortDir)),
|
||||
table.ValueParam("$start_name", types.UTF8Value(startFileName)),
|
||||
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")),
|
||||
table.ValueParam("$limit", types.Uint64Value(uint64(restLimit))),
|
||||
table.ValueParam("$limit", types.Uint64Value(uint64(chunkLimit))),
|
||||
)
|
||||
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
|
||||
var name string
|
||||
@ -261,12 +268,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
|
||||
return nil
|
||||
}
|
||||
truncated = res.CurrentResultSet().Truncated()
|
||||
glog.V(4).Infof("truncated %v, entryCount %d", truncated, entryCount)
|
||||
for res.NextRow() {
|
||||
if err := res.ScanNamed(
|
||||
named.OptionalWithDefault("name", &name),
|
||||
named.OptionalWithDefault("meta", &data)); err != nil {
|
||||
return fmt.Errorf("list scanNamed %s : %v", dir, err)
|
||||
}
|
||||
glog.V(8).Infof("name %s, fullpath %s", name, util.NewFullPath(dir, name))
|
||||
lastFileName = name
|
||||
entry := &filer.Entry{
|
||||
FullPath: util.NewFullPath(dir, name),
|
||||
@ -327,12 +336,16 @@ func (store *YdbStore) CanDropWholeBucket() bool {
|
||||
}
|
||||
|
||||
func (store *YdbStore) OnBucketCreation(bucket string) {
|
||||
if !store.SupportBucketTable {
|
||||
return
|
||||
}
|
||||
prefix := path.Join(store.tablePathPrefix, bucket)
|
||||
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
|
||||
if err := store.createTable(context.Background(),
|
||||
path.Join(store.tablePathPrefix, bucket)); err != nil {
|
||||
glog.Errorf("createTable %s: %v", bucket, err)
|
||||
if err := store.createTable(context.Background(), prefix); err != nil {
|
||||
glog.Errorf("createTable %s: %v", prefix, err)
|
||||
}
|
||||
|
||||
if store.dbs == nil {
|
||||
@ -342,12 +355,21 @@ func (store *YdbStore) OnBucketCreation(bucket string) {
|
||||
}
|
||||
|
||||
func (store *YdbStore) OnBucketDeletion(bucket string) {
|
||||
if !store.SupportBucketTable {
|
||||
return
|
||||
}
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
|
||||
if err := store.deleteTable(context.Background(),
|
||||
path.Join(store.tablePathPrefix, bucket)); err != nil {
|
||||
glog.Errorf("deleteTable %s: %v", bucket, err)
|
||||
prefix := path.Join(store.tablePathPrefix, bucket)
|
||||
glog.V(4).Infof("deleting table %s", prefix)
|
||||
|
||||
if err := store.deleteTable(context.Background(), prefix); err != nil {
|
||||
glog.Errorf("deleteTable %s: %v", prefix, err)
|
||||
}
|
||||
|
||||
if err := store.DB.Scheme().RemoveDirectory(context.Background(), prefix); err != nil {
|
||||
glog.Errorf("remove directory %s: %v", prefix, err)
|
||||
}
|
||||
|
||||
if store.dbs == nil {
|
||||
@ -384,9 +406,11 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
|
||||
}
|
||||
|
||||
prefixBuckets := store.dirBuckets + "/"
|
||||
glog.V(4).Infof("dir: %s, prefixBuckets: %s", *dir, prefixBuckets)
|
||||
if strings.HasPrefix(*dir, prefixBuckets) {
|
||||
// detect bucket
|
||||
bucketAndDir := (*dir)[len(prefixBuckets):]
|
||||
glog.V(4).Infof("bucketAndDir: %s", bucketAndDir)
|
||||
var bucket string
|
||||
if t := strings.Index(bucketAndDir, "/"); t > 0 {
|
||||
bucket = bucketAndDir[:t]
|
||||
@ -413,3 +437,31 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (store *YdbStore) ensureTables(ctx context.Context) error {
|
||||
prefixFull := store.tablePathPrefix
|
||||
|
||||
glog.V(4).Infof("creating base table %s", prefixFull)
|
||||
baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE)
|
||||
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||
return s.CreateTable(ctx, baseTable, createTableOptions()...)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to create base table %s: %v", baseTable, err)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("creating bucket tables")
|
||||
if store.SupportBucketTable {
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
for bucket := range store.dbs {
|
||||
glog.V(4).Infof("creating bucket table %s", bucket)
|
||||
bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE)
|
||||
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||
return s.CreateTable(ctx, bucketTable, createTableOptions()...)
|
||||
}); err != nil {
|
||||
glog.Errorf("failed to create bucket table %s: %v", bucketTable, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user