stream reading a whole chunk

This commit is contained in:
chrislu
2022-03-02 13:50:46 -08:00
parent 784583afc6
commit 6fbbc78574

View File

@@ -3,12 +3,12 @@ package filer
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
"io" "io"
"math" "math"
"net/url" "net/url"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@@ -19,9 +19,15 @@ import (
) )
const ( const (
ManifestBatch = 10000 ManifestBatch = 3
) )
var bytesBufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
for _, chunk := range chunks { for _, chunk := range chunks {
if chunk.IsChunkManifest { if chunk.IsChunkManifest {
@@ -78,14 +84,14 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
} }
// IsChunkManifest // IsChunkManifest
data := mem.Allocate(int(chunk.Size)) bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
defer mem.Free(data) defer bytesBufferPool.Put(bytesBuffer)
_, err := fetchChunk(data, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil { if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
} }
m := &filer_pb.FileChunkManifest{} m := &filer_pb.FileChunkManifest{}
if err := proto.Unmarshal(data, m); err != nil { if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
} }
@@ -95,13 +101,17 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
} }
// TODO fetch from cache for weed mount? // TODO fetch from cache for weed mount?
func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) (int, error) { func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(fileId) urlStrings, err := lookupFileIdFn(fileId)
if err != nil { if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err return err
} }
return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0) err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
if err != nil {
return err
}
return nil
} }
func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {