mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-24 04:14:21 +08:00
move file reader, entryViewCache to file handle
reduce file object size
This commit is contained in:
parent
5985a7d38d
commit
ca0f07a188
@ -109,7 +109,6 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
|||||||
dir: dir,
|
dir: dir,
|
||||||
wfs: dir.wfs,
|
wfs: dir.wfs,
|
||||||
entry: entry,
|
entry: entry,
|
||||||
entryViewCache: nil,
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
f.(*File).dir = dir // in case dir node was created later
|
f.(*File).dir = dir // in case dir node was created later
|
||||||
@ -408,7 +407,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
|||||||
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
|
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
|
||||||
if fsNode != nil {
|
if fsNode != nil {
|
||||||
if file, ok := fsNode.(*File); ok {
|
if file, ok := fsNode.(*File); ok {
|
||||||
file.clearEntry()
|
file.entry = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,10 +2,8 @@ package filesys
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/fuse"
|
"github.com/seaweedfs/fuse"
|
||||||
@ -34,10 +32,7 @@ type File struct {
|
|||||||
dir *Dir
|
dir *Dir
|
||||||
wfs *WFS
|
wfs *WFS
|
||||||
entry *filer_pb.Entry
|
entry *filer_pb.Entry
|
||||||
entryLock sync.RWMutex
|
|
||||||
entryViewCache []filer.VisibleInterval
|
|
||||||
isOpen int
|
isOpen int
|
||||||
reader io.ReaderAt
|
|
||||||
dirtyMetadata bool
|
dirtyMetadata bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,8 +149,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
entry.Chunks = chunks
|
entry.Chunks = chunks
|
||||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
|
|
||||||
file.setReader(nil)
|
|
||||||
}
|
}
|
||||||
entry.Attributes.FileSize = req.Size
|
entry.Attributes.FileSize = req.Size
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
@ -274,7 +267,6 @@ func (file *File) Forget() {
|
|||||||
glog.V(4).Infof("Forget file %s", t)
|
glog.V(4).Infof("Forget file %s", t)
|
||||||
file.wfs.fsNodeCache.DeleteFsNode(t)
|
file.wfs.fsNodeCache.DeleteFsNode(t)
|
||||||
file.wfs.ReleaseHandle(t, 0)
|
file.wfs.ReleaseHandle(t, 0)
|
||||||
file.setReader(nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
|
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
|
||||||
@ -294,7 +286,7 @@ func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, er
|
|||||||
return entry, err
|
return entry, err
|
||||||
}
|
}
|
||||||
if entry != nil {
|
if entry != nil {
|
||||||
file.setEntry(entry)
|
file.entry = entry
|
||||||
} else {
|
} else {
|
||||||
glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
|
glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
|
||||||
}
|
}
|
||||||
@ -336,44 +328,11 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
|
|||||||
return lessThan(chunks[i], chunks[j])
|
return lessThan(chunks[i], chunks[j])
|
||||||
})
|
})
|
||||||
|
|
||||||
// add to entry view cache
|
|
||||||
for _, chunk := range chunks {
|
|
||||||
file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk)
|
|
||||||
}
|
|
||||||
|
|
||||||
file.setReader(nil)
|
|
||||||
|
|
||||||
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
|
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
|
||||||
|
|
||||||
entry.Chunks = append(entry.Chunks, newChunks...)
|
entry.Chunks = append(entry.Chunks, newChunks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) setReader(reader io.ReaderAt) {
|
|
||||||
r := file.reader
|
|
||||||
if r != nil {
|
|
||||||
if closer, ok := r.(io.Closer); ok {
|
|
||||||
closer.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
file.reader = reader
|
|
||||||
}
|
|
||||||
|
|
||||||
func (file *File) setEntry(entry *filer_pb.Entry) {
|
|
||||||
file.entryLock.Lock()
|
|
||||||
defer file.entryLock.Unlock()
|
|
||||||
file.entry = entry
|
|
||||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
|
|
||||||
file.setReader(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (file *File) clearEntry() {
|
|
||||||
file.entryLock.Lock()
|
|
||||||
defer file.entryLock.Unlock()
|
|
||||||
file.entry = nil
|
|
||||||
file.entryViewCache = nil
|
|
||||||
file.setReader(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (file *File) saveEntry(entry *filer_pb.Entry) error {
|
func (file *File) saveEntry(entry *filer_pb.Entry) error {
|
||||||
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
@ -400,7 +359,5 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) getEntry() *filer_pb.Entry {
|
func (file *File) getEntry() *filer_pb.Entry {
|
||||||
file.entryLock.RLock()
|
|
||||||
defer file.entryLock.RUnlock()
|
|
||||||
return file.entry
|
return file.entry
|
||||||
}
|
}
|
||||||
|
@ -20,9 +20,11 @@ import (
|
|||||||
|
|
||||||
type FileHandle struct {
|
type FileHandle struct {
|
||||||
// cache file has been written to
|
// cache file has been written to
|
||||||
dirtyPages *ContinuousDirtyPages
|
dirtyPages *ContinuousDirtyPages
|
||||||
contentType string
|
entryViewCache []filer.VisibleInterval
|
||||||
handle uint64
|
reader io.ReaderAt
|
||||||
|
contentType string
|
||||||
|
handle uint64
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
f *File
|
f *File
|
||||||
@ -125,20 +127,20 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var chunkResolveErr error
|
var chunkResolveErr error
|
||||||
if fh.f.entryViewCache == nil {
|
if fh.entryViewCache == nil {
|
||||||
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
|
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
|
||||||
if chunkResolveErr != nil {
|
if chunkResolveErr != nil {
|
||||||
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
||||||
}
|
}
|
||||||
fh.f.setReader(nil)
|
fh.reader = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
reader := fh.f.reader
|
reader := fh.reader
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
|
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
|
||||||
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
||||||
}
|
}
|
||||||
fh.f.setReader(reader)
|
fh.reader = reader
|
||||||
|
|
||||||
totalRead, err := reader.ReadAt(buff, offset)
|
totalRead, err := reader.ReadAt(buff, offset)
|
||||||
|
|
||||||
@ -200,8 +202,6 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
|
|||||||
fh.Lock()
|
fh.Lock()
|
||||||
defer fh.Unlock()
|
defer fh.Unlock()
|
||||||
|
|
||||||
fh.f.entryViewCache = nil
|
|
||||||
|
|
||||||
if fh.f.isOpen <= 0 {
|
if fh.f.isOpen <= 0 {
|
||||||
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
|
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
|
||||||
fh.f.isOpen = 0
|
fh.f.isOpen = 0
|
||||||
@ -211,9 +211,10 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
|
|||||||
if fh.f.isOpen == 1 {
|
if fh.f.isOpen == 1 {
|
||||||
|
|
||||||
fh.f.isOpen--
|
fh.f.isOpen--
|
||||||
|
fh.entryViewCache = nil
|
||||||
|
fh.reader = nil
|
||||||
|
|
||||||
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
|
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
|
||||||
fh.f.setReader(nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -111,7 +111,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
|||||||
if err := wfs.Server.InvalidateNodeData(file); err != nil {
|
if err := wfs.Server.InvalidateNodeData(file); err != nil {
|
||||||
glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
|
glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
|
||||||
}
|
}
|
||||||
file.clearEntry()
|
file.entry = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dir, name := filePath.DirAndName()
|
dir, name := filePath.DirAndName()
|
||||||
|
Loading…
Reference in New Issue
Block a user