mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-14 08:56:19 +08:00
a correct implementation of filer
This commit is contained in:
26
go/filer/design.txt
Normal file
26
go/filer/design.txt
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
Design Assumptions:
|
||||||
|
1. the number of directories are magnitudely smaller than the number of files
|
||||||
|
2. unlimited number of files under any directories
|
||||||
|
Phylosophy:
|
||||||
|
metadata for directories and files should be separated
|
||||||
|
Design:
|
||||||
|
Store directories in normal map
|
||||||
|
all of directories hopefully all be in memory
|
||||||
|
efficient to move/rename/list_directories
|
||||||
|
Log directory changes to append only log file
|
||||||
|
Store files in sorted string table in <dir_id/filename> format
|
||||||
|
efficient to list_files, just simple iterator
|
||||||
|
efficient to locate files, binary search
|
||||||
|
|
||||||
|
Testing:
|
||||||
|
1. starting server, "weed server -filer=true"
|
||||||
|
2. posting files to different folders
|
||||||
|
curl -F "filename=@design.txt" "http://localhost:8888/sources/"
|
||||||
|
curl -F "filename=@design.txt" "http://localhost:8888/design/"
|
||||||
|
curl -F "filename=@directory.go" "http://localhost:8888/sources/weed/go/"
|
||||||
|
curl -F "filename=@directory.go" "http://localhost:8888/sources/testing/go/"
|
||||||
|
curl -F "filename=@filer.go" "http://localhost:8888/sources/weed/go/"
|
||||||
|
curl -F "filename=@filer_in_leveldb.go" "http://localhost:8888/sources/weed/go/"
|
||||||
|
curl "http://localhost:8888/?pretty=y"
|
||||||
|
curl "http://localhost:8888/sources/weed/go/?pretty=y"
|
||||||
|
curl "http://localhost:8888/sources/weed/go/?pretty=y"
|
18
go/filer/directory.go
Normal file
18
go/filer/directory.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import ()
|
||||||
|
|
||||||
|
type DirectoryId int32
|
||||||
|
|
||||||
|
type DirectoryEntry struct {
|
||||||
|
Name string //dir name without path
|
||||||
|
Id DirectoryId
|
||||||
|
}
|
||||||
|
|
||||||
|
type DirectoryManager interface {
|
||||||
|
FindDirectory(dirPath string) (DirectoryId, error)
|
||||||
|
ListDirectories(dirPath string) (dirNames []DirectoryEntry, err error)
|
||||||
|
MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error)
|
||||||
|
MoveUnderDirectory(oldDirPath string, newParentDirPath string) error
|
||||||
|
DeleteDirectory(dirPath string) error
|
||||||
|
}
|
232
go/filer/directory_in_map.go
Normal file
232
go/filer/directory_in_map.go
Normal file
@@ -0,0 +1,232 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DirectoryEntryInMap struct {
|
||||||
|
Name string
|
||||||
|
Parent *DirectoryEntryInMap
|
||||||
|
SubDirectories map[string]*DirectoryEntryInMap
|
||||||
|
Id DirectoryId
|
||||||
|
}
|
||||||
|
|
||||||
|
type DirectoryManagerInMap struct {
|
||||||
|
Root *DirectoryEntryInMap
|
||||||
|
max DirectoryId
|
||||||
|
logFile *os.File
|
||||||
|
isLoading bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap) {
|
||||||
|
d = &DirectoryEntryInMap{Name: name, Parent: parent}
|
||||||
|
d.SubDirectories = make(map[string]*DirectoryEntryInMap)
|
||||||
|
d.Id = dm.max
|
||||||
|
dm.max++
|
||||||
|
parts := make([]string, 0)
|
||||||
|
for p := d; p != nil && p.Name != ""; p = p.Parent {
|
||||||
|
parts = append(parts, p.Name)
|
||||||
|
}
|
||||||
|
n := len(parts)
|
||||||
|
if n <= 0 {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
for i := 0; i < n/2; i++ {
|
||||||
|
parts[i], parts[n-1-i] = parts[n-1-i], parts[i]
|
||||||
|
}
|
||||||
|
dm.log("add", "/"+strings.Join(parts, "/"), strconv.Itoa(int(d.Id)))
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) log(words ...string) {
|
||||||
|
if !dm.isLoading {
|
||||||
|
dm.logFile.WriteString(strings.Join(words, "\t") + "\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err error) {
|
||||||
|
dm = &DirectoryManagerInMap{}
|
||||||
|
dm.Root = dm.NewDirectoryEntryInMap(nil, "")
|
||||||
|
if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot write directory log file %s.idx: %s", dirLogFile, err.Error())
|
||||||
|
}
|
||||||
|
return dm, dm.load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) processEachLine(line string) error {
|
||||||
|
if strings.HasPrefix(line, "#") {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if line == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(line, "\t")
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
switch parts[0] {
|
||||||
|
case "add":
|
||||||
|
v, pe := strconv.Atoi(parts[2])
|
||||||
|
if pe != nil {
|
||||||
|
return pe
|
||||||
|
}
|
||||||
|
if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
case "mov":
|
||||||
|
if e := dm.MoveUnderDirectory(parts[1], parts[2]); e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
case "del":
|
||||||
|
if e := dm.DeleteDirectory(parts[1]); e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
fmt.Printf("line %s has %s!\n", line, parts[0])
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (dm *DirectoryManagerInMap) load() error {
|
||||||
|
dm.max = 0
|
||||||
|
lines := bufio.NewReader(dm.logFile)
|
||||||
|
dm.isLoading = true
|
||||||
|
defer func() { dm.isLoading = false }()
|
||||||
|
for {
|
||||||
|
line, err := util.Readln(lines)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if pe := dm.processEachLine(string(line)); pe != nil {
|
||||||
|
return pe
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryInMap, error) {
|
||||||
|
if dirPath == "" {
|
||||||
|
return dm.Root, nil
|
||||||
|
}
|
||||||
|
dirPath = filepath.Clean(dirPath)
|
||||||
|
if dirPath == "/" {
|
||||||
|
return dm.Root, nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(dirPath, "/")
|
||||||
|
dir := dm.Root
|
||||||
|
for i := 1; i < len(parts); i++ {
|
||||||
|
if sub, ok := dir.SubDirectories[parts[i]]; ok {
|
||||||
|
dir = sub
|
||||||
|
} else {
|
||||||
|
return dm.Root, fmt.Errorf("Directory %s Not Found", dirPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dir, nil
|
||||||
|
}
|
||||||
|
func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (DirectoryId, error) {
|
||||||
|
d, e := dm.findDirectory(dirPath)
|
||||||
|
if e == nil {
|
||||||
|
return d.Id, nil
|
||||||
|
}
|
||||||
|
return dm.Root.Id, e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error {
|
||||||
|
dirPath = filepath.Clean(dirPath)
|
||||||
|
if dirPath == "/" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(dirPath, "/")
|
||||||
|
dir := dm.Root
|
||||||
|
for i := 1; i < len(parts); i++ {
|
||||||
|
sub, ok := dir.SubDirectories[parts[i]]
|
||||||
|
if !ok {
|
||||||
|
if i != len(parts)-1 {
|
||||||
|
return fmt.Errorf("%s should be created after parent %s!", dirPath, parts[i])
|
||||||
|
}
|
||||||
|
sub = dm.NewDirectoryEntryInMap(dir, parts[i])
|
||||||
|
if sub.Id != dirId {
|
||||||
|
return fmt.Errorf("%s should be have id %v instead of %v!", dirPath, sub.Id, dirId)
|
||||||
|
}
|
||||||
|
dir.SubDirectories[parts[i]] = sub
|
||||||
|
}
|
||||||
|
dir = sub
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEntryInMap, created bool) {
|
||||||
|
dirPath = filepath.Clean(dirPath)
|
||||||
|
if dirPath == "/" {
|
||||||
|
return dm.Root, false
|
||||||
|
}
|
||||||
|
parts := strings.Split(dirPath, "/")
|
||||||
|
dir = dm.Root
|
||||||
|
for i := 1; i < len(parts); i++ {
|
||||||
|
sub, ok := dir.SubDirectories[parts[i]]
|
||||||
|
if !ok {
|
||||||
|
sub = dm.NewDirectoryEntryInMap(dir, parts[i])
|
||||||
|
dir.SubDirectories[parts[i]] = sub
|
||||||
|
created = true
|
||||||
|
}
|
||||||
|
dir = sub
|
||||||
|
}
|
||||||
|
return dir, created
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) {
|
||||||
|
dir, _ := dm.makeDirectory(dirPath)
|
||||||
|
return dir.Id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParentDirPath string) error {
|
||||||
|
oldDir, oe := dm.findDirectory(oldDirPath)
|
||||||
|
if oe != nil {
|
||||||
|
return oe
|
||||||
|
}
|
||||||
|
parentDir, pe := dm.findDirectory(newParentDirPath)
|
||||||
|
if pe != nil {
|
||||||
|
return pe
|
||||||
|
}
|
||||||
|
delete(oldDir.Parent.SubDirectories, oldDir.Name)
|
||||||
|
parentDir.SubDirectories[oldDir.Name] = oldDir
|
||||||
|
oldDir.Parent = parentDir
|
||||||
|
dm.log("mov", oldDirPath, newParentDirPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []DirectoryEntry, err error) {
|
||||||
|
d, e := dm.findDirectory(dirPath)
|
||||||
|
if e != nil {
|
||||||
|
return dirNames, e
|
||||||
|
}
|
||||||
|
for k, v := range d.SubDirectories {
|
||||||
|
dirNames = append(dirNames, DirectoryEntry{Name: k, Id: v.Id})
|
||||||
|
}
|
||||||
|
return dirNames, nil
|
||||||
|
}
|
||||||
|
func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error {
|
||||||
|
if dirPath == "/" {
|
||||||
|
return fmt.Errorf("Can not delete %s", dirPath)
|
||||||
|
}
|
||||||
|
d, e := dm.findDirectory(dirPath)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
if len(d.SubDirectories) != 0 {
|
||||||
|
return fmt.Errorf("dir %s still has sub directories", dirPath)
|
||||||
|
}
|
||||||
|
delete(d.Parent.SubDirectories, d.Name)
|
||||||
|
d.Parent = nil
|
||||||
|
dm.log("del", dirPath)
|
||||||
|
return nil
|
||||||
|
}
|
73
go/filer/directory_test.go
Normal file
73
go/filer/directory_test.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDirectory(t *testing.T) {
|
||||||
|
{
|
||||||
|
dm, _ := NewDirectoryManagerInMap("/tmp/dir.log")
|
||||||
|
dm.MakeDirectory("/a/b/c")
|
||||||
|
dm.MakeDirectory("/a/b/d")
|
||||||
|
dm.MakeDirectory("/a/b/e")
|
||||||
|
dm.MakeDirectory("/a/b/e/f")
|
||||||
|
dm.MakeDirectory("/a/b/e/f/g")
|
||||||
|
dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b")
|
||||||
|
dm.MakeDirectory("/a/b/g/h/i")
|
||||||
|
dm.DeleteDirectory("/a/b/e/f")
|
||||||
|
dm.DeleteDirectory("/a/b/e")
|
||||||
|
dirNames, _ := dm.ListDirectories("/a/b/e")
|
||||||
|
for _, v := range dirNames {
|
||||||
|
println("sub1 dir:", v.Name, "id", v.Id)
|
||||||
|
}
|
||||||
|
dm.logFile.Close()
|
||||||
|
|
||||||
|
var path []string
|
||||||
|
printTree(dm.Root, path)
|
||||||
|
|
||||||
|
dm2, e := NewDirectoryManagerInMap("/tmp/dir.log")
|
||||||
|
if e != nil {
|
||||||
|
println("load error", e.Error())
|
||||||
|
}
|
||||||
|
if !compare(dm.Root, dm2.Root) {
|
||||||
|
t.Fatal("restored dir not the same!")
|
||||||
|
}
|
||||||
|
printTree(dm2.Root, path)
|
||||||
|
}
|
||||||
|
if true {
|
||||||
|
os.Remove("/tmp/dir.log")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func printTree(node *DirectoryEntryInMap, path []string) {
|
||||||
|
println(strings.Join(path, "/") + "/" + node.Name)
|
||||||
|
path = append(path, node.Name)
|
||||||
|
for _, v := range node.SubDirectories {
|
||||||
|
printTree(v, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func compare(root1 *DirectoryEntryInMap, root2 *DirectoryEntryInMap) bool {
|
||||||
|
if len(root1.SubDirectories) != len(root2.SubDirectories) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if root1.Name != root2.Name {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if root1.Id != root2.Id {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !(root1.Parent == nil && root2.Parent == nil) {
|
||||||
|
if root1.Parent.Id != root2.Parent.Id {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range root1.SubDirectories {
|
||||||
|
if !compare(v, root2.SubDirectories[k]) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
19
go/filer/filer.go
Normal file
19
go/filer/filer.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import ()
|
||||||
|
|
||||||
|
type FileId string //file id on weedfs
|
||||||
|
|
||||||
|
type FileEntry struct {
|
||||||
|
Name string //file name without path
|
||||||
|
Id FileId
|
||||||
|
}
|
||||||
|
|
||||||
|
type Filer interface {
|
||||||
|
CreateFile(filePath string, fid string) (err error)
|
||||||
|
FindFile(filePath string) (fid string, err error)
|
||||||
|
ListDirectories(dirPath string) (dirs []DirectoryEntry, err error)
|
||||||
|
ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error)
|
||||||
|
DeleteDirectory(dirPath string) (err error)
|
||||||
|
DeleteFile(filePath string) (fid string, err error)
|
||||||
|
}
|
64
go/filer/filer_embedded.go
Normal file
64
go/filer/filer_embedded.go
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilerEmbedded struct {
|
||||||
|
directories *DirectoryManagerInMap
|
||||||
|
files *FileListInLevelDb
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFilerEmbedded(dir string) (filer *FilerEmbedded, err error) {
|
||||||
|
dm, de := NewDirectoryManagerInMap(filepath.Join(dir, "dir.log"))
|
||||||
|
if de != nil {
|
||||||
|
return nil, de
|
||||||
|
}
|
||||||
|
fl, fe := NewFileListInLevelDb(dir)
|
||||||
|
if fe != nil {
|
||||||
|
return nil, fe
|
||||||
|
}
|
||||||
|
filer = &FilerEmbedded{
|
||||||
|
directories: dm,
|
||||||
|
files: fl,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (filer *FilerEmbedded) CreateFile(filePath string, fid string) (err error) {
|
||||||
|
dir, file := filepath.Split(filePath)
|
||||||
|
dirId, e := filer.directories.MakeDirectory(dir)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
return filer.files.CreateFile(dirId, file, fid)
|
||||||
|
}
|
||||||
|
func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) {
|
||||||
|
dir, file := filepath.Split(filePath)
|
||||||
|
dirId, e := filer.directories.FindDirectory(dir)
|
||||||
|
if e != nil {
|
||||||
|
return "", e
|
||||||
|
}
|
||||||
|
return filer.files.FindFile(dirId, file)
|
||||||
|
}
|
||||||
|
func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) {
|
||||||
|
return filer.directories.ListDirectories(dirPath)
|
||||||
|
}
|
||||||
|
func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) {
|
||||||
|
dirId, e := filer.directories.FindDirectory(dirPath)
|
||||||
|
if e != nil {
|
||||||
|
return nil, e
|
||||||
|
}
|
||||||
|
return filer.files.ListFiles(dirId, lastFileName, limit), nil
|
||||||
|
}
|
||||||
|
func (filer *FilerEmbedded) DeleteDirectory(dirPath string) (err error) {
|
||||||
|
return filer.directories.DeleteDirectory(dirPath)
|
||||||
|
}
|
||||||
|
func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) {
|
||||||
|
dir, file := filepath.Split(filePath)
|
||||||
|
dirId, e := filer.directories.FindDirectory(dir)
|
||||||
|
if e != nil {
|
||||||
|
return "", e
|
||||||
|
}
|
||||||
|
return filer.files.DeleteFile(dirId, file)
|
||||||
|
}
|
70
go/filer/filer_in_leveldb.go
Normal file
70
go/filer/filer_in_leveldb.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
|
"github.com/syndtr/goleveldb/leveldb/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FileListInLevelDb struct {
|
||||||
|
db *leveldb.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) {
|
||||||
|
fl = &FileListInLevelDb{}
|
||||||
|
if fl.db, err = leveldb.OpenFile(dir, nil); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func genKey(dirId DirectoryId, fileName string) []byte {
|
||||||
|
ret := make([]byte, 0, 4+len(fileName))
|
||||||
|
for i := 3; i >= 0; i-- {
|
||||||
|
ret = append(ret, byte(dirId>>(uint(i)*8)))
|
||||||
|
}
|
||||||
|
ret = append(ret, []byte(fileName)...)
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) {
|
||||||
|
glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid)
|
||||||
|
return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil)
|
||||||
|
}
|
||||||
|
func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) {
|
||||||
|
if fid, err = fl.FindFile(dirId, fileName); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = fl.db.Delete(genKey(dirId, fileName), nil)
|
||||||
|
return fid, err
|
||||||
|
}
|
||||||
|
func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) {
|
||||||
|
data, e := fl.db.Get(genKey(dirId, fileName), nil)
|
||||||
|
if e != nil {
|
||||||
|
return "", e
|
||||||
|
}
|
||||||
|
return string(data), nil
|
||||||
|
}
|
||||||
|
func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []FileEntry) {
|
||||||
|
glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit)
|
||||||
|
dirKey := genKey(dirId, "")
|
||||||
|
iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil)
|
||||||
|
limitCounter := -1
|
||||||
|
for iter.Next() {
|
||||||
|
limitCounter++
|
||||||
|
if limit > 0 {
|
||||||
|
if limitCounter > limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key := iter.Key()
|
||||||
|
if !bytes.HasPrefix(key, dirKey) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fileName := key[len(dirKey):]
|
||||||
|
files = append(files, FileEntry{Name: string(fileName), Id: FileId(string(iter.Value()))})
|
||||||
|
}
|
||||||
|
iter.Release()
|
||||||
|
return
|
||||||
|
}
|
@@ -1,6 +1,7 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"code.google.com/p/weed-fs/go/glog"
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
@@ -21,3 +22,16 @@ func TestFolderWritable(folder string) (err error) {
|
|||||||
}
|
}
|
||||||
return errors.New("Not writable!")
|
return errors.New("Not writable!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Readln(r *bufio.Reader) ([]byte, error) {
|
||||||
|
var (
|
||||||
|
isPrefix bool = true
|
||||||
|
err error = nil
|
||||||
|
line, ln []byte
|
||||||
|
)
|
||||||
|
for isPrefix && err == nil {
|
||||||
|
line, isPrefix, err = r.ReadLine()
|
||||||
|
ln = append(ln, line...)
|
||||||
|
}
|
||||||
|
return ln, err
|
||||||
|
}
|
||||||
|
@@ -1,28 +1,17 @@
|
|||||||
package weed_server
|
package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"code.google.com/p/weed-fs/go/filer"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
"github.com/syndtr/goleveldb/leveldb/util"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
|
||||||
1. level db is only for local instance
|
|
||||||
2. db stores two types of pairs
|
|
||||||
<path/to/dir, sub folder names>
|
|
||||||
<path/to/file, file id>
|
|
||||||
So, to list a directory, just get the directory entry, and iterate the current directory files
|
|
||||||
Care must be taken to maintain the <dir, sub dirs> and <file, fileid> pairs.
|
|
||||||
3.
|
|
||||||
*/
|
|
||||||
type FilerServer struct {
|
type FilerServer struct {
|
||||||
port string
|
port string
|
||||||
master string
|
master string
|
||||||
collection string
|
collection string
|
||||||
db *leveldb.DB
|
filer filer.Filer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) {
|
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) {
|
||||||
@@ -32,7 +21,8 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
|
|||||||
port: ":" + strconv.Itoa(port),
|
port: ":" + strconv.Itoa(port),
|
||||||
}
|
}
|
||||||
|
|
||||||
if fs.db, err = leveldb.OpenFile(dir, nil); err != nil {
|
if fs.filer, err = filer.NewFilerEmbedded(dir); err != nil {
|
||||||
|
glog.Fatal("Can not start filer in dir:", dir)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,127 +30,3 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
|
|||||||
|
|
||||||
return fs, nil
|
return fs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) CreateFile(fullFileName string, fid string) (err error) {
|
|
||||||
fs.ensureFileFolder(fullFileName)
|
|
||||||
return fs.db.Put([]byte(fullFileName), []byte(fid), nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) FindFile(fullFileName string) (fid string, err error) {
|
|
||||||
return fs.findEntry(fullFileName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) ListDirectories(fullpath string) (dirs []string, err error) {
|
|
||||||
data, e := fs.db.Get([]byte(fullpath), nil)
|
|
||||||
if e != nil {
|
|
||||||
return nil, e
|
|
||||||
}
|
|
||||||
val := string(data)
|
|
||||||
if val == "" {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return strings.Split(val, ":"), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) ListFiles(fullpath string, start, limit int) (files []string) {
|
|
||||||
if !strings.HasSuffix(fullpath, "/") {
|
|
||||||
fullpath += "/"
|
|
||||||
}
|
|
||||||
iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil)
|
|
||||||
startCounter, limitCounter := -1, 0
|
|
||||||
for iter.Next() {
|
|
||||||
startCounter++
|
|
||||||
if startCounter < start {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
limitCounter++
|
|
||||||
if limit > 0 {
|
|
||||||
if limitCounter > limit {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
key := string(iter.Key())
|
|
||||||
if !strings.HasPrefix(key, fullpath) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
fileName := key[len(fullpath):]
|
|
||||||
if fileName == "" {
|
|
||||||
continue //skip the directory entry
|
|
||||||
}
|
|
||||||
if strings.Contains(fileName, "/") {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
files = append(files, fileName)
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) Delete(fullpath string, isForceDirectoryRemoval bool) (fid string, isFile bool, err error) {
|
|
||||||
val, e := fs.findEntry(fullpath)
|
|
||||||
if e != nil {
|
|
||||||
return "", false, e
|
|
||||||
}
|
|
||||||
if strings.Contains(val, ",") {
|
|
||||||
return val, true, fs.db.Delete([]byte(fullpath), nil)
|
|
||||||
}
|
|
||||||
// deal with directory
|
|
||||||
if !strings.HasSuffix(fullpath, "/") {
|
|
||||||
fullpath += "/"
|
|
||||||
}
|
|
||||||
iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil)
|
|
||||||
counter := 0
|
|
||||||
for iter.Next() {
|
|
||||||
counter++
|
|
||||||
if counter > 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
if counter > 0 {
|
|
||||||
return "", false, errors.New("Force Deletion Not Supported Yet")
|
|
||||||
}
|
|
||||||
return "", false, fs.db.Delete([]byte(fullpath), nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) findEntry(fullpath string) (value string, err error) {
|
|
||||||
data, e := fs.db.Get([]byte(fullpath), nil)
|
|
||||||
if e != nil {
|
|
||||||
return "", e
|
|
||||||
}
|
|
||||||
return string(data), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FilerServer) ensureFileFolder(fullFileName string) (err error) {
|
|
||||||
parts := strings.Split(fullFileName, "/")
|
|
||||||
path := "/"
|
|
||||||
for i := 1; i < len(parts); i++ {
|
|
||||||
sub := parts[i]
|
|
||||||
if i == len(parts)-1 {
|
|
||||||
sub = ""
|
|
||||||
}
|
|
||||||
if err = fs.ensureFolderHasEntry(path, sub); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if sub != "" {
|
|
||||||
path = path + sub + "/"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (fs *FilerServer) ensureFolderHasEntry(path string, sub string) (err error) {
|
|
||||||
val, e := fs.findEntry(path)
|
|
||||||
if e == leveldb.ErrNotFound {
|
|
||||||
return fs.db.Put([]byte(path), []byte(sub), nil)
|
|
||||||
} else if e != nil {
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
list := strings.Split(val, ":")
|
|
||||||
for _, v := range list {
|
|
||||||
if v == sub {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
list = append(list, sub)
|
|
||||||
return fs.db.Put([]byte(path), []byte(strings.Join(list, ":")), nil)
|
|
||||||
}
|
|
||||||
|
@@ -34,7 +34,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
if !strings.HasSuffix(r.URL.Path, "/") {
|
if !strings.HasSuffix(r.URL.Path, "/") {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dirlist, err := fs.ListDirectories(r.URL.Path)
|
dirlist, err := fs.filer.ListDirectories(r.URL.Path)
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
glog.V(3).Infoln("Directory Not Found in db", r.URL.Path)
|
glog.V(3).Infoln("Directory Not Found in db", r.URL.Path)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
@@ -43,12 +43,12 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["Directory"] = r.URL.Path
|
m["Directory"] = r.URL.Path
|
||||||
m["Subdirectories"] = dirlist
|
m["Subdirectories"] = dirlist
|
||||||
start, _ := strconv.Atoi(r.FormValue("start"))
|
lastFile := r.FormValue("lastFile")
|
||||||
limit, limit_err := strconv.Atoi(r.FormValue("limit"))
|
limit, limit_err := strconv.Atoi(r.FormValue("limit"))
|
||||||
if limit_err != nil {
|
if limit_err != nil {
|
||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
m["Files"] = fs.ListFiles(r.URL.Path, start, limit)
|
m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFile, limit)
|
||||||
writeJsonQuiet(w, r, m)
|
writeJsonQuiet(w, r, m)
|
||||||
}
|
}
|
||||||
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
|
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
|
||||||
@@ -56,7 +56,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
|
|||||||
fs.listDirectoryHandler(w, r)
|
fs.listDirectoryHandler(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fileId, err := fs.FindFile(r.URL.Path)
|
fileId, err := fs.filer.FindFile(r.URL.Path)
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
glog.V(3).Infoln("Not found in db", r.URL.Path)
|
glog.V(3).Infoln("Not found in db", r.URL.Path)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
@@ -115,6 +115,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid)
|
u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid)
|
||||||
|
glog.V(4).Infoln("post to", u)
|
||||||
request := &http.Request{
|
request := &http.Request{
|
||||||
Method: r.Method,
|
Method: r.Method,
|
||||||
URL: u,
|
URL: u,
|
||||||
@@ -141,6 +142,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJsonError(w, r, ra_err)
|
writeJsonError(w, r, ra_err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
glog.V(4).Infoln("post result", string(resp_body))
|
||||||
var ret operation.UploadResult
|
var ret operation.UploadResult
|
||||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||||
if unmarshal_err != nil {
|
if unmarshal_err != nil {
|
||||||
@@ -167,7 +169,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if db_err := fs.CreateFile(path, assignResult.Fid); db_err != nil {
|
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
|
||||||
|
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
|
||||||
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
||||||
glog.V(0).Infoln("failing to write to filer server", db_err.Error())
|
glog.V(0).Infoln("failing to write to filer server", db_err.Error())
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
@@ -177,10 +180,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
isForceDirectoryRemoval := r.FormValue("force") == "true" // force remove for directories
|
var err error
|
||||||
fid, isFile, err := fs.Delete(r.URL.Path, isForceDirectoryRemoval)
|
var fid string
|
||||||
if err == nil {
|
if strings.HasSuffix(r.URL.Path, "/") {
|
||||||
if isFile {
|
err = fs.filer.DeleteDirectory(r.URL.Path)
|
||||||
|
} else {
|
||||||
|
fid, err = fs.filer.DeleteFile(r.URL.Path)
|
||||||
|
if err == nil {
|
||||||
err = operation.DeleteFile(fs.master, fid)
|
err = operation.DeleteFile(fs.master, fid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user