mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-01 16:21:44 +08:00
mostly refactoring, add some error handling
This commit is contained in:
parent
07f712c83f
commit
3e669e6d7b
@ -167,8 +167,9 @@ func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
|
func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
|
||||||
|
dirFullPath := dir.FullPath()
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: dir.FullPath(),
|
Directory: dirFullPath,
|
||||||
Entry: &filer_pb.Entry{
|
Entry: &filer_pb.Entry{
|
||||||
Name: name,
|
Name: name,
|
||||||
IsDirectory: mode&os.ModeDir > 0,
|
IsDirectory: mode&os.ModeDir > 0,
|
||||||
@ -186,7 +187,7 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex
|
|||||||
OExcl: exclusive,
|
OExcl: exclusive,
|
||||||
Signatures: []int32{dir.wfs.signature},
|
Signatures: []int32{dir.wfs.signature},
|
||||||
}
|
}
|
||||||
glog.V(1).Infof("create %s/%s", dir.FullPath(), name)
|
glog.V(1).Infof("create %s/%s", dirFullPath, name)
|
||||||
|
|
||||||
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
@ -197,11 +198,14 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex
|
|||||||
if strings.Contains(err.Error(), "EEXIST") {
|
if strings.Contains(err.Error(), "EEXIST") {
|
||||||
return fuse.EEXIST
|
return fuse.EEXIST
|
||||||
}
|
}
|
||||||
glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), name, err)
|
glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err)
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
|
if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
||||||
|
glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err)
|
||||||
|
return fuse.EIO
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -228,35 +232,40 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dirFullPath := dir.FullPath()
|
||||||
|
|
||||||
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
dir.wfs.mapPbIdFromLocalToFiler(newEntry)
|
dir.wfs.mapPbIdFromLocalToFiler(newEntry)
|
||||||
defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
|
defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
|
||||||
|
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: dir.FullPath(),
|
Directory: dirFullPath,
|
||||||
Entry: newEntry,
|
Entry: newEntry,
|
||||||
Signatures: []int32{dir.wfs.signature},
|
Signatures: []int32{dir.wfs.signature},
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("mkdir: %v", request)
|
glog.V(1).Infof("mkdir: %v", request)
|
||||||
if err := filer_pb.CreateEntry(client, request); err != nil {
|
if err := filer_pb.CreateEntry(client, request); err != nil {
|
||||||
glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
|
glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
|
if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
||||||
|
glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err)
|
||||||
|
return fuse.EIO
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
|
node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name))
|
||||||
|
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
|
glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
|
||||||
|
|
||||||
return nil, fuse.EIO
|
return nil, fuse.EIO
|
||||||
}
|
}
|
||||||
@ -320,7 +329,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
|||||||
dirPath := util.FullPath(dir.FullPath())
|
dirPath := util.FullPath(dir.FullPath())
|
||||||
glog.V(4).Infof("dir ReadDirAll %s", dirPath)
|
glog.V(4).Infof("dir ReadDirAll %s", dirPath)
|
||||||
|
|
||||||
processEachEntryFn := func(entry *filer.Entry, isLast bool) error {
|
processEachEntryFn := func(entry *filer.Entry, isLast bool) {
|
||||||
if entry.IsDirectory() {
|
if entry.IsDirectory() {
|
||||||
dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir}
|
dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir}
|
||||||
ret = append(ret, dirent)
|
ret = append(ret, dirent)
|
||||||
@ -328,7 +337,6 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
|||||||
dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode))}
|
dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode))}
|
||||||
ret = append(ret, dirent)
|
ret = append(ret, dirent)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
|
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
|
||||||
@ -382,7 +390,8 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
|
|||||||
|
|
||||||
func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
||||||
|
|
||||||
filePath := util.NewFullPath(dir.FullPath(), req.Name)
|
dirFullPath := dir.FullPath()
|
||||||
|
filePath := util.NewFullPath(dirFullPath, req.Name)
|
||||||
entry, err := filer_pb.GetEntry(dir.wfs, filePath)
|
entry, err := filer_pb.GetEntry(dir.wfs, filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -394,14 +403,17 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
|||||||
// first, ensure the filer store can correctly delete
|
// first, ensure the filer store can correctly delete
|
||||||
glog.V(3).Infof("remove file: %v", req)
|
glog.V(3).Infof("remove file: %v", req)
|
||||||
isDeleteData := entry.HardLinkCounter <= 1
|
isDeleteData := entry.HardLinkCounter <= 1
|
||||||
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
|
err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err)
|
glog.V(3).Infof("not found remove file %s: %v", filePath, err)
|
||||||
return fuse.ENOENT
|
return fuse.ENOENT
|
||||||
}
|
}
|
||||||
|
|
||||||
// then, delete meta cache and fsNode cache
|
// then, delete meta cache and fsNode cache
|
||||||
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
|
if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil {
|
||||||
|
glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err)
|
||||||
|
return fuse.ESTALE
|
||||||
|
}
|
||||||
|
|
||||||
// clear entry inside the file
|
// clear entry inside the file
|
||||||
fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
|
fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
|
||||||
@ -415,7 +427,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
|
|||||||
// remove current file handle if any
|
// remove current file handle if any
|
||||||
dir.wfs.handlesLock.Lock()
|
dir.wfs.handlesLock.Lock()
|
||||||
defer dir.wfs.handlesLock.Unlock()
|
defer dir.wfs.handlesLock.Unlock()
|
||||||
inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
|
inodeId := filePath.AsInode()
|
||||||
delete(dir.wfs.handles, inodeId)
|
delete(dir.wfs.handles, inodeId)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -568,7 +580,10 @@ func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
|
|||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
|
||||||
dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
|
if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
||||||
|
glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
|
||||||
|
return fuse.ESTALE
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user