add mime, use simple insert and update filer store API

1. add mime type to file in filer
2. purge old chunks if overwrite during insert
This commit is contained in:
Chris Lu
2018-05-30 20:24:57 -07:00
parent 26e7cd8c75
commit 0301504184
20 changed files with 149 additions and 132 deletions

View File

@@ -77,26 +77,21 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr
return entry, nil
}
func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) (*filer2.Entry, error) {
entry, err := store.FindEntry(fullpath)
if err != nil {
return nil, nil
}
func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) (error) {
dir, name := fullpath.DirAndName()
res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir)
if err != nil {
return nil, fmt.Errorf("delete %s: %s", fullpath, err)
return fmt.Errorf("delete %s: %s", fullpath, err)
}
_, err = res.RowsAffected()
if err != nil {
return nil, fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
}
return entry, nil
return nil
}
func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {

View File

@@ -88,22 +88,17 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.
return entry, nil
}
func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
entry, err = store.FindEntry(fullpath)
if err != nil {
return nil, nil
}
func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName()
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?",
dir, name).Exec(); err != nil {
return entry, fmt.Errorf("delete %s : %v", entry.FullPath, err)
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return entry, nil
return nil
}
func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,

View File

@@ -13,6 +13,7 @@ type Attr struct {
Mode os.FileMode // file mode
Uid uint32 // owner uid
Gid uint32 // group gid
Mime string
}
func (attr Attr) IsDirectory() bool {

View File

@@ -17,6 +17,7 @@ func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
FileMode: uint32(entry.Attr.Mode),
Uid: entry.Uid,
Gid: entry.Gid,
Mime: entry.Mime,
},
Chunks: entry.Chunks,
}
@@ -36,6 +37,7 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
entry.Attr.Mode = os.FileMode(message.Attributes.FileMode)
entry.Attr.Uid = message.Attributes.Uid
entry.Attr.Gid = message.Attributes.Gid
entry.Attr.Mime = message.Attributes.Mime
entry.Chunks = message.Chunks

View File

@@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/operation"
)
type Filer struct {
@@ -101,6 +102,10 @@ func (f *Filer) CreateEntry(entry *Entry) error {
}
*/
if oldEntry, err := f.FindEntry(entry.FullPath); err == nil {
f.deleteChunks(oldEntry)
}
if err := f.store.InsertEntry(entry); err != nil {
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
@@ -116,26 +121,30 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
return f.store.FindEntry(p)
}
func (f *Filer) DeleteEntry(p FullPath) (fileEntry *Entry, err error) {
func (f *Filer) DeleteEntryMetaAndData(p FullPath) (err error) {
entry, err := f.FindEntry(p)
if err != nil {
return nil, err
return err
}
if entry.IsDirectory() {
entries, err := f.ListDirectoryEntries(p, "", false, 1)
if err != nil {
return nil, fmt.Errorf("list folder %s: %v", p, err)
return fmt.Errorf("list folder %s: %v", p, err)
}
if len(entries) > 0 {
return nil, fmt.Errorf("folder %s is not empty", p)
return fmt.Errorf("folder %s is not empty", p)
}
}
f.deleteChunks(entry)
return f.store.DeleteEntry(p)
}
func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
p = p[0: len(p)-1]
}
return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit)
}
@@ -164,3 +173,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
func (f *Filer) deleteChunks(entry *Entry) {
if f.master == "" {
return
}
if entry == nil {
return
}
for _, chunk := range entry.Chunks {
if err := operation.DeleteFile(f.master, chunk.FileId, ""); err != nil {
glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
}
}
}

View File

@@ -11,7 +11,7 @@ type FilerStore interface {
InsertEntry(*Entry) error
UpdateEntry(*Entry) (err error)
FindEntry(FullPath) (entry *Entry, err error)
DeleteEntry(FullPath) (fileEntry *Entry, err error)
DeleteEntry(FullPath) (err error)
ListDirectoryEntries(dirPath FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error)
}

View File

@@ -93,20 +93,15 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En
return entry, nil
}
func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
entry, err = store.FindEntry(fullpath)
if err != nil {
return nil, nil
}
err = store.db.Delete(key, nil)
if err != nil {
return entry, fmt.Errorf("delete %s : %v", entry.FullPath, err)
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return entry, nil
return nil
}
func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,

View File

@@ -56,13 +56,9 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr
return entry, nil
}
func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
item := store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
if item == nil {
return nil, nil
}
entry = item.(entryItem).Entry
return entry, nil
func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
return nil
}
func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {

View File

@@ -134,7 +134,7 @@ func TestCreateFileAndList(t *testing.T) {
}
// delete file and count
filer.DeleteEntry(file3Path)
filer.DeleteEntryMetaAndData(file3Path)
entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))

View File

@@ -94,28 +94,23 @@ func (store *RedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr
return entry, nil
}
func (store *RedisStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
entry, err = store.FindEntry(fullpath)
if err != nil {
return nil, nil
}
func (store *RedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
if err != nil {
return entry, fmt.Errorf("delete %s : %v", entry.FullPath, err)
return fmt.Errorf("delete %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if name != "" {
_, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
if err != nil {
return nil, fmt.Errorf("delete %s in parent dir: %v", entry.FullPath, err)
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
}
return entry, nil
return nil
}
func (store *RedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,