mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-02-09 09:17:28 +08:00
refactoring
This commit is contained in:
@@ -99,7 +99,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
|
||||
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
|
||||
@@ -118,7 +118,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.Fu
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
|
||||
@@ -135,7 +135,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
|
||||
if err != nil {
|
||||
@@ -150,7 +150,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
|
||||
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
sqlText := store.SqlListExclusive
|
||||
if inclusive {
|
||||
@@ -172,7 +172,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
|
||||
}
|
||||
|
||||
entry := &filer2.Entry{
|
||||
FullPath: filer2.NewFullPath(string(fullpath), name),
|
||||
FullPath: util.NewFullPath(string(fullpath), name),
|
||||
}
|
||||
if err = entry.DecodeAttributesAndChunks(data); err != nil {
|
||||
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
|
||||
|
||||
@@ -75,7 +75,7 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entr
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
var data []byte
|
||||
@@ -102,7 +102,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
dir, name := fullpath.DirAndName()
|
||||
|
||||
@@ -115,7 +115,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
|
||||
|
||||
if err := store.session.Query(
|
||||
"DELETE FROM filemeta WHERE directory=?",
|
||||
@@ -126,7 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
|
||||
@@ -139,7 +139,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
|
||||
iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
|
||||
for iter.Scan(&name, &data) {
|
||||
entry := &filer2.Entry{
|
||||
FullPath: filer2.NewFullPath(string(fullpath), name),
|
||||
FullPath: util.NewFullPath(string(fullpath), name),
|
||||
}
|
||||
if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
|
||||
err = decodeErr
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
type Attr struct {
|
||||
@@ -27,7 +28,7 @@ func (attr Attr) IsDirectory() bool {
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
FullPath
|
||||
util.FullPath
|
||||
|
||||
Attr
|
||||
Extended map[string][]byte
|
||||
|
||||
@@ -92,7 +92,7 @@ func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (e
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
|
||||
key := genKey(fullpath.DirAndName())
|
||||
|
||||
resp, err := store.client.Get(ctx, string(key))
|
||||
@@ -115,7 +115,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
key := genKey(fullpath.DirAndName())
|
||||
|
||||
if _, err := store.client.Delete(ctx, string(key)); err != nil {
|
||||
@@ -125,7 +125,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
|
||||
if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
|
||||
@@ -136,7 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer
|
||||
}
|
||||
|
||||
func (store *EtcdStore) ListDirectoryEntries(
|
||||
ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
|
||||
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
|
||||
) (entries []*filer2.Entry, err error) {
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
|
||||
@@ -159,7 +159,7 @@ func (store *EtcdStore) ListDirectoryEntries(
|
||||
break
|
||||
}
|
||||
entry := &filer2.Entry{
|
||||
FullPath: filer2.NewFullPath(string(fullpath), fileName),
|
||||
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
|
||||
}
|
||||
if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
|
||||
err = decodeErr
|
||||
@@ -179,7 +179,7 @@ func genKey(dirPath, fileName string) (key []byte) {
|
||||
return key
|
||||
}
|
||||
|
||||
func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
|
||||
func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
|
||||
keyPrefix = []byte(string(fullpath))
|
||||
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
|
||||
if len(startFileName) > 0 {
|
||||
|
||||
@@ -100,7 +100,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
|
||||
// not found, check the store directly
|
||||
if dirEntry == nil {
|
||||
glog.V(4).Infof("find uncached directory: %s", dirPath)
|
||||
dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
|
||||
dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
|
||||
} else {
|
||||
// glog.V(4).Infof("found cached directory: %s", dirPath)
|
||||
}
|
||||
@@ -112,7 +112,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
|
||||
now := time.Now()
|
||||
|
||||
dirEntry = &Entry{
|
||||
FullPath: FullPath(dirPath),
|
||||
FullPath: util.FullPath(dirPath),
|
||||
Attr: Attr{
|
||||
Mtime: now,
|
||||
Crtime: now,
|
||||
@@ -127,7 +127,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
|
||||
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
|
||||
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
|
||||
if mkdirErr != nil {
|
||||
if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == filer_pb.ErrNotFound {
|
||||
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
|
||||
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
|
||||
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
||||
}
|
||||
@@ -207,7 +207,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
|
||||
return f.store.UpdateEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
|
||||
func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
|
||||
|
||||
now := time.Now()
|
||||
|
||||
@@ -234,7 +234,7 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
|
||||
|
||||
}
|
||||
|
||||
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
|
||||
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
|
||||
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
|
||||
p = p[0 : len(p)-1]
|
||||
}
|
||||
@@ -251,7 +251,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileN
|
||||
return entries, err
|
||||
}
|
||||
|
||||
func (f *Filer) doListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
|
||||
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
|
||||
listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
|
||||
if listErr != nil {
|
||||
return listedEntries, expiredCount, "", listErr
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
type BucketName string
|
||||
@@ -28,7 +29,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) {
|
||||
|
||||
limit := math.MaxInt32
|
||||
|
||||
entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
|
||||
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit)
|
||||
|
||||
if err != nil {
|
||||
glog.V(1).Infof("no buckets found: %v", err)
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
package filer2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
)
|
||||
|
||||
func VolumeId(fileId string) string {
|
||||
lastCommaIndex := strings.LastIndex(fileId, ",")
|
||||
if lastCommaIndex > 0 {
|
||||
return fileId[:lastCommaIndex]
|
||||
}
|
||||
return fileId
|
||||
}
|
||||
|
||||
type FilerClient interface {
|
||||
WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
|
||||
AdjustedUrl(hostAndPort string) string
|
||||
}
|
||||
|
||||
func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
|
||||
|
||||
dir, name := fullFilePath.DirAndName()
|
||||
|
||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: dir,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
|
||||
resp, err := filer_pb.LookupEntry(client, request)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
}
|
||||
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.Entry == nil {
|
||||
// glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
|
||||
return nil
|
||||
}
|
||||
|
||||
entry = resp.Entry
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
|
||||
|
||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
lastEntryName := ""
|
||||
|
||||
request := &filer_pb.ListEntriesRequest{
|
||||
Directory: string(fullDirPath),
|
||||
Prefix: prefix,
|
||||
StartFromFileName: lastEntryName,
|
||||
Limit: math.MaxUint32,
|
||||
}
|
||||
|
||||
glog.V(3).Infof("read directory: %v", request)
|
||||
stream, err := client.ListEntries(context.Background(), request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list %s: %v", fullDirPath, err)
|
||||
}
|
||||
|
||||
var prevEntry *filer_pb.Entry
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
if prevEntry != nil {
|
||||
fn(prevEntry, true)
|
||||
}
|
||||
break
|
||||
} else {
|
||||
return recvErr
|
||||
}
|
||||
}
|
||||
if prevEntry != nil {
|
||||
fn(prevEntry, false)
|
||||
}
|
||||
prevEntry = resp.Entry
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
@@ -7,9 +7,10 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
|
||||
if p == "/" {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5,13 +5,15 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestProtoMarshalText(t *testing.T) {
|
||||
|
||||
oldEntry := &Entry{
|
||||
FullPath: FullPath("/this/path/to"),
|
||||
FullPath: util.FullPath("/this/path/to"),
|
||||
Attr: Attr{
|
||||
Mtime: time.Now(),
|
||||
Mode: 0644,
|
||||
|
||||
@@ -17,10 +17,10 @@ type FilerStore interface {
|
||||
InsertEntry(context.Context, *Entry) error
|
||||
UpdateEntry(context.Context, *Entry) (err error)
|
||||
// err == filer2.ErrNotFound if not found
|
||||
FindEntry(context.Context, FullPath) (entry *Entry, err error)
|
||||
DeleteEntry(context.Context, FullPath) (err error)
|
||||
DeleteFolderChildren(context.Context, FullPath) (err error)
|
||||
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
|
||||
FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
|
||||
DeleteEntry(context.Context, util.FullPath) (err error)
|
||||
DeleteFolderChildren(context.Context, util.FullPath) (err error)
|
||||
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
|
||||
|
||||
BeginTransaction(ctx context.Context) (context.Context, error)
|
||||
CommitTransaction(ctx context.Context) error
|
||||
@@ -72,7 +72,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
|
||||
return fsw.actualStore.UpdateEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
|
||||
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -87,7 +87,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry
|
||||
return
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
|
||||
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -97,7 +97,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
|
||||
return fsw.actualStore.DeleteEntry(ctx, fp)
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
|
||||
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -107,7 +107,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullP
|
||||
return fsw.actualStore.DeleteFolderChildren(ctx, fp)
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
|
||||
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package filer2
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
type FullPath string
|
||||
|
||||
func NewFullPath(dir, name string) FullPath {
|
||||
return FullPath(dir).Child(name)
|
||||
}
|
||||
|
||||
func (fp FullPath) DirAndName() (string, string) {
|
||||
dir, name := filepath.Split(string(fp))
|
||||
if dir == "/" {
|
||||
return dir, name
|
||||
}
|
||||
if len(dir) < 1 {
|
||||
return "/", ""
|
||||
}
|
||||
return dir[:len(dir)-1], name
|
||||
}
|
||||
|
||||
func (fp FullPath) Name() string {
|
||||
_, name := filepath.Split(string(fp))
|
||||
return name
|
||||
}
|
||||
|
||||
func (fp FullPath) Child(name string) FullPath {
|
||||
dir := string(fp)
|
||||
if strings.HasSuffix(dir, "/") {
|
||||
return FullPath(dir + name)
|
||||
}
|
||||
return FullPath(dir + "/" + name)
|
||||
}
|
||||
|
||||
func (fp FullPath) AsInode() uint64 {
|
||||
return uint64(util.HashStringToLong(string(fp)))
|
||||
}
|
||||
@@ -89,7 +89,7 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry)
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
|
||||
key := genKey(fullpath.DirAndName())
|
||||
|
||||
data, err := store.db.Get(key, nil)
|
||||
@@ -114,7 +114,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
key := genKey(fullpath.DirAndName())
|
||||
|
||||
err = store.db.Delete(key, nil)
|
||||
@@ -125,7 +125,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
|
||||
@@ -153,7 +153,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath fi
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
@@ -176,7 +176,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath fi
|
||||
break
|
||||
}
|
||||
entry := &filer2.Entry{
|
||||
FullPath: filer2.NewFullPath(string(fullpath), fileName),
|
||||
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
|
||||
}
|
||||
if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
|
||||
err = decodeErr
|
||||
@@ -197,7 +197,7 @@ func genKey(dirPath, fileName string) (key []byte) {
|
||||
return key
|
||||
}
|
||||
|
||||
func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
|
||||
func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
|
||||
keyPrefix = []byte(string(fullpath))
|
||||
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
|
||||
if len(startFileName) > 0 {
|
||||
|
||||
@@ -2,10 +2,12 @@ package leveldb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func TestCreateAndFind(t *testing.T) {
|
||||
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
filer.SetStore(store)
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
|
||||
}
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||
entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// checking one upper directory
|
||||
entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||
entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
|
||||
if err != nil {
|
||||
t.Errorf("list entries: %v", err)
|
||||
return
|
||||
|
||||
@@ -98,7 +98,7 @@ func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
|
||||
dir, name := fullpath.DirAndName()
|
||||
key, partitionId := genKey(dir, name, store.dbCount)
|
||||
|
||||
@@ -124,7 +124,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
dir, name := fullpath.DirAndName()
|
||||
key, partitionId := genKey(dir, name, store.dbCount)
|
||||
|
||||
@@ -136,7 +136,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
|
||||
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
@@ -164,7 +164,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath f
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
|
||||
@@ -188,7 +188,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath f
|
||||
break
|
||||
}
|
||||
entry := &filer2.Entry{
|
||||
FullPath: filer2.NewFullPath(string(fullpath), fileName),
|
||||
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
|
||||
}
|
||||
|
||||
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
|
||||
@@ -211,7 +211,7 @@ func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int)
|
||||
return key, partitionId
|
||||
}
|
||||
|
||||
func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
|
||||
func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
|
||||
keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
|
||||
if len(startFileName) > 0 {
|
||||
keyPrefix = append(keyPrefix, []byte(startFileName)...)
|
||||
|
||||
@@ -2,10 +2,12 @@ package leveldb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func TestCreateAndFind(t *testing.T) {
|
||||
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
filer.SetStore(store)
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
|
||||
}
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||
entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// checking one upper directory
|
||||
entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||
entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
|
||||
if err != nil {
|
||||
t.Errorf("list entries: %v", err)
|
||||
return
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -61,7 +62,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
|
||||
|
||||
data, err := store.Client.Get(string(fullpath)).Result()
|
||||
if err == redis.Nil {
|
||||
@@ -83,7 +84,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
|
||||
|
||||
_, err = store.Client.Del(string(fullpath)).Result()
|
||||
|
||||
@@ -102,7 +103,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
|
||||
|
||||
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
|
||||
if err != nil {
|
||||
@@ -110,7 +111,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
|
||||
}
|
||||
|
||||
for _, fileName := range members {
|
||||
path := filer2.NewFullPath(string(fullpath), fileName)
|
||||
path := util.NewFullPath(string(fullpath), fileName)
|
||||
_, err = store.Client.Del(string(path)).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
|
||||
@@ -120,7 +121,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
dirListKey := genDirectoryListKey(string(fullpath))
|
||||
@@ -158,7 +159,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
|
||||
|
||||
// fetch entry meta
|
||||
for _, fileName := range members {
|
||||
path := filer2.NewFullPath(string(fullpath), fileName)
|
||||
path := util.NewFullPath(string(fullpath), fileName)
|
||||
entry, err := store.FindEntry(ctx, path)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("list %s : %v", path, err)
|
||||
|
||||
@@ -71,13 +71,13 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [
|
||||
}
|
||||
}
|
||||
|
||||
func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
|
||||
func NewChunkStreamReaderFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
|
||||
|
||||
return &ChunkStreamReader{
|
||||
chunkViews: chunkViews,
|
||||
lookupFileId: func(fileId string) (targetUrl string, err error) {
|
||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
vid := fileIdToVolumeId(fileId)
|
||||
vid := VolumeId(fileId)
|
||||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
||||
VolumeIds: []string{vid},
|
||||
})
|
||||
@@ -178,10 +178,11 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func fileIdToVolumeId(fileId string) (volumeId string) {
|
||||
parts := strings.Split(fileId, ",")
|
||||
if len(parts) != 2 {
|
||||
return fileId
|
||||
func VolumeId(fileId string) string {
|
||||
lastCommaIndex := strings.LastIndex(fileId, ",")
|
||||
if lastCommaIndex > 0 {
|
||||
return fileId[:lastCommaIndex]
|
||||
}
|
||||
return parts[0]
|
||||
return fileId
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user