mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 09:57:24 +08:00
file handler directly read from volume servers
this mostly works fine now! next: need to cache files to local disk
This commit is contained in:
@@ -11,6 +11,9 @@ import (
|
||||
"bytes"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"time"
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
type FileHandle struct {
|
||||
@@ -33,46 +36,94 @@ type FileHandle struct {
|
||||
}
|
||||
|
||||
var _ = fs.Handle(&FileHandle{})
|
||||
var _ = fs.HandleReadAller(&FileHandle{})
|
||||
// var _ = fs.HandleReader(&FileHandle{})
|
||||
// var _ = fs.HandleReadAller(&FileHandle{})
|
||||
var _ = fs.HandleReader(&FileHandle{})
|
||||
var _ = fs.HandleFlusher(&FileHandle{})
|
||||
var _ = fs.HandleWriter(&FileHandle{})
|
||||
var _ = fs.HandleReleaser(&FileHandle{})
|
||||
|
||||
func (fh *FileHandle) ReadAll(ctx context.Context) (content []byte, err error) {
|
||||
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||
|
||||
glog.V(3).Infof("%v/%v read all fh ", fh.dirPath, fh.name)
|
||||
glog.V(3).Infof("%v/%v read fh: [%d,%d)", fh.dirPath, fh.name, req.Offset, req.Offset+int64(req.Size))
|
||||
|
||||
if len(fh.Chunks) == 0 {
|
||||
glog.V(0).Infof("empty fh %v/%v", fh.dirPath, fh.name)
|
||||
return
|
||||
return fmt.Errorf("empty file %v/%v", fh.dirPath, fh.name)
|
||||
}
|
||||
|
||||
err = fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
buff := make([]byte, req.Size)
|
||||
|
||||
// FIXME: need to either use Read() or implement differently
|
||||
chunks, _ := filer2.CompactFileChunks(fh.Chunks)
|
||||
glog.V(1).Infof("read fh %v/%v %d/%d chunks", fh.dirPath, fh.name, len(chunks), len(fh.Chunks))
|
||||
for i, chunk := range chunks {
|
||||
glog.V(1).Infof("read fh %v/%v %d/%d chunk %s [%d,%d)", fh.dirPath, fh.name, i, len(chunks), chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||
}
|
||||
request := &filer_pb.GetFileContentRequest{
|
||||
FileId: chunks[0].FileId,
|
||||
}
|
||||
chunkViews := filer2.ReadFromChunks(fh.Chunks, req.Offset, req.Size)
|
||||
|
||||
glog.V(1).Infof("read fh content %d chunk %s [%d,%d): %v", len(chunks),
|
||||
chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request)
|
||||
resp, err := client.GetFileContent(ctx, request)
|
||||
var vids []string
|
||||
for _, chunkView := range chunkViews {
|
||||
vids = append(vids, volumeId(chunkView.FileId))
|
||||
}
|
||||
|
||||
vid2Locations := make(map[string]*filer_pb.Locations)
|
||||
|
||||
err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
|
||||
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
|
||||
VolumeIds: vids,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
content = resp.Content
|
||||
vid2Locations = resp.LocationsMap
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return content, err
|
||||
if err != nil {
|
||||
glog.V(3).Infof("%v/%v read fh lookup volume ids: %v", fh.dirPath, fh.name, err)
|
||||
return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
|
||||
}
|
||||
|
||||
var totalRead int64
|
||||
var wg sync.WaitGroup
|
||||
for _, chunkView := range chunkViews {
|
||||
wg.Add(1)
|
||||
go func(chunkView *filer2.ChunkView) {
|
||||
defer wg.Done()
|
||||
|
||||
glog.V(3).Infof("read fh reading chunk: %+v", chunkView)
|
||||
|
||||
locations := vid2Locations[volumeId(chunkView.FileId)]
|
||||
if locations == nil || len(locations.Locations) == 0 {
|
||||
glog.V(0).Infof("failed to locate %s", chunkView.FileId)
|
||||
err = fmt.Errorf("failed to locate %s", chunkView.FileId)
|
||||
return
|
||||
}
|
||||
|
||||
var n int64
|
||||
n, err = util.ReadUrl(
|
||||
fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
|
||||
chunkView.Offset,
|
||||
int(chunkView.Size),
|
||||
buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)])
|
||||
|
||||
if err != nil {
|
||||
|
||||
glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.dirPath, fh.name, locations.Locations[0].Url, chunkView.FileId, n, err)
|
||||
|
||||
err = fmt.Errorf("failed to read http://%s/%s: %v",
|
||||
locations.Locations[0].Url, chunkView.FileId, err)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("read fh read %d bytes: %+v", n, chunkView)
|
||||
totalRead += n
|
||||
|
||||
}(chunkView)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
resp.Data = buff[:totalRead]
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Write to the file handle
|
||||
@@ -179,3 +230,11 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func volumeId(fileId string) string {
|
||||
lastCommaIndex := strings.LastIndex(fileId, ",")
|
||||
if lastCommaIndex > 0 {
|
||||
return fileId[:lastCommaIndex]
|
||||
}
|
||||
return fileId
|
||||
}
|
||||
|
Reference in New Issue
Block a user