s3 API add ListObjectsV1

This commit is contained in:
Chris Lu
2018-07-22 01:14:36 -07:00
parent 922c614bde
commit 6319d84f42
11 changed files with 333 additions and 131 deletions

View File

@@ -8,19 +8,23 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/karlseguin/ccache"
"strings"
"sync"
)
type Option struct {
FilerGrpcAddress string
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
ChunkSizeLimit int64
DataCenter string
DirListingLimit int
}
type WFS struct {
filerGrpcAddress string
filerMountRootPath string
option *Option
listDirectoryEntriesCache *ccache.Cache
collection string
replication string
ttlSec int32
chunkSizeLimit int64
dataCenter string
// contains all open handles
handles []*FileHandle
@@ -28,32 +32,23 @@ type WFS struct {
pathToHandleLock sync.Mutex
}
func NewSeaweedFileSystem(filerGrpcAddress string, filerMountRootPath string, collection string, replication string, ttlSec int32, chunkSizeLimitMB int, dataCenter string) *WFS {
if filerMountRootPath != "/" && strings.HasSuffix(filerMountRootPath, "/") {
filerMountRootPath = filerMountRootPath[0 : len(filerMountRootPath)-1]
}
func NewSeaweedFileSystem(option *Option) *WFS {
return &WFS{
filerGrpcAddress: filerGrpcAddress,
filerMountRootPath: filerMountRootPath,
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
collection: collection,
replication: replication,
ttlSec: ttlSec,
chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
dataCenter: dataCenter,
pathToHandleIndex: make(map[string]int),
}
}
func (wfs *WFS) Root() (fs.Node, error) {
return &Dir{Path: wfs.filerMountRootPath, wfs: wfs}, nil
return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
}
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress)
grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err)
}
defer grpcConnection.Close()
@@ -62,7 +57,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return fn(client)
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (handle *FileHandle) {
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
wfs.pathToHandleLock.Lock()
defer wfs.pathToHandleLock.Unlock()
@@ -70,39 +65,33 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (handle *FileHandle)
index, found := wfs.pathToHandleIndex[fullpath]
if found && wfs.handles[index] != nil {
glog.V(4).Infoln(fullpath, "found handle id", index)
glog.V(4).Infoln(fullpath, "found fileHandle id", index)
return wfs.handles[index]
}
// create a new handler
handle = &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
Uid: uid,
Gid: gid,
}
fileHandle = newFileHandle(file, uid, gid)
if found && wfs.handles[index] != nil {
glog.V(4).Infoln(fullpath, "reuse previous handle id", index)
wfs.handles[index] = handle
handle.handle = uint64(index)
glog.V(4).Infoln(fullpath, "reuse previous fileHandle id", index)
wfs.handles[index] = fileHandle
fileHandle.handle = uint64(index)
return
}
for i, h := range wfs.handles {
if h == nil {
wfs.handles[i] = handle
handle.handle = uint64(i)
wfs.handles[i] = fileHandle
fileHandle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i
glog.V(4).Infoln(fullpath, "reuse handle id", handle.handle)
glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
return
}
}
wfs.handles = append(wfs.handles, handle)
handle.handle = uint64(len(wfs.handles) - 1)
glog.V(4).Infoln(fullpath, "new handle id", handle.handle)
wfs.pathToHandleIndex[fullpath] = int(handle.handle)
wfs.handles = append(wfs.handles, fileHandle)
fileHandle.handle = uint64(len(wfs.handles) - 1)
glog.V(4).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
return
}