mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-22 06:07:24 +08:00
go code can read and write chunk manifest
This commit is contained in:
136
weed/filer2/filechunk_manifest.go
Normal file
136
weed/filer2/filechunk_manifest.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package filer2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
if chunk.IsChunkManifest {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
|
||||||
|
// TODO maybe parallel this
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
if !chunk.IsChunkManifest {
|
||||||
|
dataChunks = append(dataChunks, chunk)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsChunkManifest
|
||||||
|
data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
|
||||||
|
if err != nil {
|
||||||
|
return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
|
||||||
|
}
|
||||||
|
m := &filer_pb.FileChunkManifest{}
|
||||||
|
if err := proto.Unmarshal(data, m); err != nil {
|
||||||
|
return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
|
||||||
|
}
|
||||||
|
manifestChunks = append(manifestChunks, chunk)
|
||||||
|
// recursive
|
||||||
|
dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
|
||||||
|
if subErr != nil {
|
||||||
|
return chunks, nil, subErr
|
||||||
|
}
|
||||||
|
dataChunks = append(dataChunks, dchunks...)
|
||||||
|
manifestChunks = append(manifestChunks, mchunks...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
||||||
|
urlString, err := lookupFileIdFn(fileId)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
|
||||||
|
buffer.Write(data)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("read %s failed, err: %v", fileId, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
|
||||||
|
return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
var dataChunks []*filer_pb.FileChunk
|
||||||
|
for _, chunk := range inputChunks {
|
||||||
|
if !chunk.IsChunkManifest {
|
||||||
|
dataChunks = append(dataChunks, chunk)
|
||||||
|
} else {
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
manifestBatch := mergeFactor
|
||||||
|
remaining := len(dataChunks)
|
||||||
|
for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch {
|
||||||
|
chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch])
|
||||||
|
if err != nil {
|
||||||
|
return dataChunks, err
|
||||||
|
}
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
remaining -= manifestBatch
|
||||||
|
}
|
||||||
|
// remaining
|
||||||
|
for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
|
||||||
|
chunks = append(chunks, dataChunks[i])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
// create and serialize the manifest
|
||||||
|
data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
|
||||||
|
Chunks: dataChunks,
|
||||||
|
})
|
||||||
|
if serErr != nil {
|
||||||
|
return nil, fmt.Errorf("serializing manifest: %v", serErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
|
||||||
|
for k := 0; k < len(dataChunks); k++ {
|
||||||
|
chunk := dataChunks[k]
|
||||||
|
if minOffset > int64(chunk.Offset) {
|
||||||
|
minOffset = chunk.Offset
|
||||||
|
}
|
||||||
|
if maxOffset < int64(chunk.Size)+chunk.Offset {
|
||||||
|
maxOffset = int64(chunk.Size) + chunk.Offset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
manifestChunk.IsChunkManifest = true
|
||||||
|
manifestChunk.Offset = minOffset
|
||||||
|
manifestChunk.Size = uint64(maxOffset - minOffset)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)
|
113
weed/filer2/filechunk_manifest_test.go
Normal file
113
weed/filer2/filechunk_manifest_test.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package filer2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDoMaybeManifestize(t *testing.T) {
|
||||||
|
var manifestTests = []struct {
|
||||||
|
inputs []*filer_pb.FileChunk
|
||||||
|
expected []*filer_pb.FileChunk
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
inputs: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: false},
|
||||||
|
{FileId: "2", IsChunkManifest: false},
|
||||||
|
{FileId: "3", IsChunkManifest: false},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
expected: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "12", IsChunkManifest: true},
|
||||||
|
{FileId: "34", IsChunkManifest: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inputs: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: true},
|
||||||
|
{FileId: "2", IsChunkManifest: false},
|
||||||
|
{FileId: "3", IsChunkManifest: false},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
expected: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: true},
|
||||||
|
{FileId: "23", IsChunkManifest: true},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inputs: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: false},
|
||||||
|
{FileId: "2", IsChunkManifest: true},
|
||||||
|
{FileId: "3", IsChunkManifest: false},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
expected: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "2", IsChunkManifest: true},
|
||||||
|
{FileId: "13", IsChunkManifest: true},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inputs: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: true},
|
||||||
|
{FileId: "2", IsChunkManifest: true},
|
||||||
|
{FileId: "3", IsChunkManifest: false},
|
||||||
|
{FileId: "4", IsChunkManifest: false},
|
||||||
|
},
|
||||||
|
expected: []*filer_pb.FileChunk{
|
||||||
|
{FileId: "1", IsChunkManifest: true},
|
||||||
|
{FileId: "2", IsChunkManifest: true},
|
||||||
|
{FileId: "34", IsChunkManifest: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mtest := range manifestTests {
|
||||||
|
println("test", i)
|
||||||
|
actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge)
|
||||||
|
assertEqualChunks(t, mtest.expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) {
|
||||||
|
assert.Equal(t, len(expected), len(actual))
|
||||||
|
for i := 0; i < len(actual); i++ {
|
||||||
|
assertEqualChunk(t, actual[i], expected[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) {
|
||||||
|
assert.Equal(t, expected.FileId, actual.FileId)
|
||||||
|
assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
|
||||||
|
for k := 0; k < len(dataChunks); k++ {
|
||||||
|
chunk := dataChunks[k]
|
||||||
|
buf.WriteString(chunk.FileId)
|
||||||
|
if minOffset > int64(chunk.Offset) {
|
||||||
|
minOffset = chunk.Offset
|
||||||
|
}
|
||||||
|
if maxOffset < int64(chunk.Size)+chunk.Offset {
|
||||||
|
maxOffset = int64(chunk.Size) + chunk.Offset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
manifestChunk = &filer_pb.FileChunk{
|
||||||
|
FileId: buf.String(),
|
||||||
|
}
|
||||||
|
manifestChunk.IsChunkManifest = true
|
||||||
|
manifestChunk.Offset = minOffset
|
||||||
|
manifestChunk.Size = uint64(maxOffset - minOffset)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
@@ -46,9 +46,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
|
|||||||
return fmt.Sprintf("%x", h.Sum32())
|
return fmt.Sprintf("%x", h.Sum32())
|
||||||
}
|
}
|
||||||
|
|
||||||
func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
|
func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
|
||||||
|
|
||||||
visibles := NonOverlappingVisibleIntervals(chunks)
|
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
fileIds := make(map[string]bool)
|
fileIds := make(map[string]bool)
|
||||||
for _, interval := range visibles {
|
for _, interval := range visibles {
|
||||||
@@ -65,7 +65,23 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
|
func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
|
||||||
|
if aErr != nil {
|
||||||
|
return nil, aErr
|
||||||
|
}
|
||||||
|
bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
|
||||||
|
if bErr != nil {
|
||||||
|
return nil, bErr
|
||||||
|
}
|
||||||
|
|
||||||
|
delta = append(delta, DoMinusChunks(aData, bData)...)
|
||||||
|
delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
|
||||||
|
|
||||||
fileIds := make(map[string]bool)
|
fileIds := make(map[string]bool)
|
||||||
for _, interval := range bs {
|
for _, interval := range bs {
|
||||||
@@ -94,9 +110,9 @@ func (cv *ChunkView) IsFullChunk() bool {
|
|||||||
return cv.Size == cv.ChunkSize
|
return cv.Size == cv.ChunkSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
|
func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
|
||||||
|
|
||||||
visibles := NonOverlappingVisibleIntervals(chunks)
|
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
return ViewFromVisibleIntervals(visibles, offset, size)
|
return ViewFromVisibleIntervals(visibles, offset, size)
|
||||||
|
|
||||||
@@ -190,7 +206,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
|
|||||||
return newVisibles
|
return newVisibles
|
||||||
}
|
}
|
||||||
|
|
||||||
func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
|
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
|
||||||
|
// If the file chunk content is a chunk manifest
|
||||||
|
func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
|
||||||
|
|
||||||
|
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
|
||||||
|
|
||||||
sort.Slice(chunks, func(i, j int) bool {
|
sort.Slice(chunks, func(i, j int) bool {
|
||||||
return chunks[i].Mtime < chunks[j].Mtime
|
return chunks[i].Mtime < chunks[j].Mtime
|
||||||
|
@@ -16,7 +16,7 @@ func TestCompactFileChunks(t *testing.T) {
|
|||||||
{Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
|
{Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
|
||||||
}
|
}
|
||||||
|
|
||||||
compacted, garbage := CompactFileChunks(chunks)
|
compacted, garbage := CompactFileChunks(nil, chunks)
|
||||||
|
|
||||||
if len(compacted) != 3 {
|
if len(compacted) != 3 {
|
||||||
t.Fatalf("unexpected compacted: %d", len(compacted))
|
t.Fatalf("unexpected compacted: %d", len(compacted))
|
||||||
@@ -49,7 +49,7 @@ func TestCompactFileChunks2(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
compacted, garbage := CompactFileChunks(chunks)
|
compacted, garbage := CompactFileChunks(nil, chunks)
|
||||||
|
|
||||||
if len(compacted) != 4 {
|
if len(compacted) != 4 {
|
||||||
t.Fatalf("unexpected compacted: %d", len(compacted))
|
t.Fatalf("unexpected compacted: %d", len(compacted))
|
||||||
@@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) {
|
|||||||
|
|
||||||
for i, testcase := range testcases {
|
for i, testcase := range testcases {
|
||||||
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
|
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
|
||||||
intervals := NonOverlappingVisibleIntervals(testcase.Chunks)
|
intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
|
||||||
for x, interval := range intervals {
|
for x, interval := range intervals {
|
||||||
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
|
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
|
||||||
i, x, interval.start, interval.stop, interval.fileId)
|
i, x, interval.start, interval.stop, interval.fileId)
|
||||||
@@ -371,7 +371,7 @@ func TestChunksReading(t *testing.T) {
|
|||||||
|
|
||||||
for i, testcase := range testcases {
|
for i, testcase := range testcases {
|
||||||
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
|
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
|
||||||
chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size)
|
chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
|
||||||
for x, chunk := range chunks {
|
for x, chunk := range chunks {
|
||||||
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
|
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
|
||||||
i, x, chunk.Offset, chunk.Size, chunk.FileId)
|
i, x, chunk.Offset, chunk.Size, chunk.FileId)
|
||||||
@@ -415,6 +415,6 @@ func BenchmarkCompactFileChunks(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
CompactFileChunks(chunks)
|
CompactFileChunks(nil, chunks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
package filer2
|
package filer2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -9,7 +8,6 @@ import (
|
|||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
)
|
)
|
||||||
@@ -144,19 +142,6 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
|
|||||||
|
|
||||||
func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
||||||
|
|
||||||
urlString, err := c.lookupFileId(fileId)
|
return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var buffer bytes.Buffer
|
|
||||||
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
|
|
||||||
buffer.Write(data)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
glog.V(0).Infof("read %s failed, err: %v", fileId, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer.Bytes(), nil
|
|
||||||
}
|
}
|
||||||
|
@@ -2,6 +2,7 @@ package filer2
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -14,7 +15,8 @@ import (
|
|||||||
|
|
||||||
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||||
|
|
||||||
chunkViews := ViewFromChunks(chunks, offset, size)
|
fmt.Printf("start to stream content for chunks: %+v\n", chunks)
|
||||||
|
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
|
||||||
|
|
||||||
fileId2Url := make(map[string]string)
|
fileId2Url := make(map[string]string)
|
||||||
|
|
||||||
@@ -50,14 +52,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
|
|||||||
|
|
||||||
buffer := bytes.Buffer{}
|
buffer := bytes.Buffer{}
|
||||||
|
|
||||||
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
|
lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
|
||||||
|
|
||||||
lookupFileId := func(fileId string) (targetUrl string, err error) {
|
|
||||||
return masterClient.LookupFileId(fileId)
|
return masterClient.LookupFileId(fileId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
|
||||||
|
|
||||||
for _, chunkView := range chunkViews {
|
for _, chunkView := range chunkViews {
|
||||||
urlString, err := lookupFileId(chunkView.FileId)
|
urlString, err := lookupFileIdFn(chunkView.FileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -88,23 +90,27 @@ var _ = io.ReadSeeker(&ChunkStreamReader{})
|
|||||||
|
|
||||||
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
|
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
|
||||||
|
|
||||||
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
|
lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
|
||||||
|
return masterClient.LookupFileId(fileId)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
|
||||||
|
|
||||||
return &ChunkStreamReader{
|
return &ChunkStreamReader{
|
||||||
chunkViews: chunkViews,
|
chunkViews: chunkViews,
|
||||||
lookupFileId: func(fileId string) (targetUrl string, err error) {
|
lookupFileId: lookupFileIdFn,
|
||||||
return masterClient.LookupFileId(fileId)
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
|
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
|
||||||
|
|
||||||
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
|
lookupFileIdFn := LookupFn(filerClient)
|
||||||
|
|
||||||
|
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
|
||||||
|
|
||||||
return &ChunkStreamReader{
|
return &ChunkStreamReader{
|
||||||
chunkViews: chunkViews,
|
chunkViews: chunkViews,
|
||||||
lookupFileId: LookupFn(filerClient),
|
lookupFileId: lookupFileIdFn,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,16 +2,12 @@ package filesys
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContinuousDirtyPages struct {
|
type ContinuousDirtyPages struct {
|
||||||
@@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
|
|||||||
|
|
||||||
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
|
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
|
||||||
|
|
||||||
var fileId, host string
|
|
||||||
var auth security.EncodedJwt
|
|
||||||
|
|
||||||
dir, _ := pages.f.fullpath().DirAndName()
|
dir, _ := pages.f.fullpath().DirAndName()
|
||||||
|
|
||||||
if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
|
||||||
|
|
||||||
request := &filer_pb.AssignVolumeRequest{
|
|
||||||
Count: 1,
|
|
||||||
Replication: pages.f.wfs.option.Replication,
|
|
||||||
Collection: pages.f.wfs.option.Collection,
|
|
||||||
TtlSec: pages.f.wfs.option.TtlSec,
|
|
||||||
DataCenter: pages.f.wfs.option.DataCenter,
|
|
||||||
ParentPath: dir,
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := client.AssignVolume(context.Background(), request)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
return nil, err
|
||||||
return err
|
|
||||||
}
|
|
||||||
if resp.Error != "" {
|
|
||||||
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
|
||||||
}
|
}
|
||||||
|
pages.collection, pages.replication = collection, replication
|
||||||
|
|
||||||
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
|
return chunk, nil
|
||||||
host = pages.f.wfs.AdjustedUrl(host)
|
|
||||||
pages.collection, pages.replication = resp.Collection, resp.Replication
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
|
||||||
uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
||||||
return nil, fmt.Errorf("upload data: %v", err)
|
|
||||||
}
|
|
||||||
if uploadResult.Error != "" {
|
|
||||||
glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
||||||
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
|
||||||
}
|
|
||||||
pages.f.wfs.chunkCache.SetChunk(fileId, data)
|
|
||||||
|
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
|
|||||||
|
|
||||||
func (file *File) setEntry(entry *filer_pb.Entry) {
|
func (file *File) setEntry(entry *filer_pb.Entry) {
|
||||||
file.entry = entry
|
file.entry = entry
|
||||||
file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
|
file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks)
|
||||||
file.reader = nil
|
file.reader = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var chunkResolveErr error
|
||||||
if fh.f.entryViewCache == nil {
|
if fh.f.entryViewCache == nil {
|
||||||
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
|
fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
|
||||||
|
if chunkResolveErr != nil {
|
||||||
|
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
||||||
|
}
|
||||||
fh.f.reader = nil
|
fh.f.reader = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
|||||||
glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
|
chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
|
||||||
|
chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks)
|
||||||
|
if manifestErr != nil {
|
||||||
|
// not good, but should be ok
|
||||||
|
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
|
||||||
|
}
|
||||||
fh.f.entry.Chunks = chunks
|
fh.f.entry.Chunks = chunks
|
||||||
// fh.f.entryViewCache = nil
|
// fh.f.entryViewCache = nil
|
||||||
|
|
||||||
|
66
weed/filesys/wfs_write.go
Normal file
66
weed/filesys/wfs_write.go
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package filesys
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType {
|
||||||
|
|
||||||
|
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
||||||
|
var fileId, host string
|
||||||
|
var auth security.EncodedJwt
|
||||||
|
|
||||||
|
if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
request := &filer_pb.AssignVolumeRequest{
|
||||||
|
Count: 1,
|
||||||
|
Replication: wfs.option.Replication,
|
||||||
|
Collection: wfs.option.Collection,
|
||||||
|
TtlSec: wfs.option.TtlSec,
|
||||||
|
DataCenter: wfs.option.DataCenter,
|
||||||
|
ParentPath: dir,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := client.AssignVolume(context.Background(), request)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.Error != "" {
|
||||||
|
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
|
||||||
|
host = wfs.AdjustedUrl(host)
|
||||||
|
collection, replication = resp.Collection, resp.Replication
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
||||||
|
return nil, "", "", fmt.Errorf("upload data: %v", err)
|
||||||
|
}
|
||||||
|
if uploadResult.Error != "" {
|
||||||
|
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
|
||||||
|
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
wfs.chunkCache.SetChunk(fileId, data)
|
||||||
|
|
||||||
|
chunk = uploadResult.ToPbFileChunk(fileId, offset)
|
||||||
|
return chunk, "", "", nil
|
||||||
|
}
|
||||||
|
}
|
@@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalSize := filer2.TotalSize(entry.Chunks)
|
totalSize := filer2.TotalSize(entry.Chunks)
|
||||||
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
|
chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
|
||||||
|
|
||||||
// Create a URL that references a to-be-created blob in your
|
// Create a URL that references a to-be-created blob in your
|
||||||
// Azure Storage account's container.
|
// Azure Storage account's container.
|
||||||
|
@@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalSize := filer2.TotalSize(entry.Chunks)
|
totalSize := filer2.TotalSize(entry.Chunks)
|
||||||
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
|
chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
|
||||||
|
|
||||||
bucket, err := g.client.Bucket(context.Background(), g.bucket)
|
bucket, err := g.client.Bucket(context.Background(), g.bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
|
|||||||
glog.V(0).Infof("already replicated %s", key)
|
glog.V(0).Infof("already replicated %s", key)
|
||||||
} else {
|
} else {
|
||||||
// find out what changed
|
// find out what changed
|
||||||
deletedChunks, newChunks := compareChunks(oldEntry, newEntry)
|
deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err)
|
||||||
|
}
|
||||||
|
|
||||||
// delete the chunks that are deleted from the source
|
// delete the chunks that are deleted from the source
|
||||||
if deleteIncludeChunks {
|
if deleteIncludeChunks {
|
||||||
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
|
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
|
||||||
existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
|
existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks)
|
||||||
}
|
}
|
||||||
|
|
||||||
// replicate the chunks that are new in the source
|
// replicate the chunks that are new in the source
|
||||||
@@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
|
|||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
|
func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
|
||||||
deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
|
aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
|
||||||
newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
|
if aErr != nil {
|
||||||
|
return nil, nil, aErr
|
||||||
|
}
|
||||||
|
bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks)
|
||||||
|
if bErr != nil {
|
||||||
|
return nil, nil, bErr
|
||||||
|
}
|
||||||
|
|
||||||
|
deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...)
|
||||||
|
deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...)
|
||||||
|
|
||||||
|
newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...)
|
||||||
|
newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalSize := filer2.TotalSize(entry.Chunks)
|
totalSize := filer2.TotalSize(entry.Chunks)
|
||||||
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
|
chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
|
||||||
|
|
||||||
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
|
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
|
||||||
|
|
||||||
|
@@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalSize := filer2.TotalSize(entry.Chunks)
|
totalSize := filer2.TotalSize(entry.Chunks)
|
||||||
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
|
chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
|
||||||
|
|
||||||
parts := make([]*s3.CompletedPart, len(chunkViews))
|
parts := make([]*s3.CompletedPart, len(chunkViews))
|
||||||
|
|
||||||
|
@@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) {
|
||||||
|
fid, err := needle.ParseFileIdFromString(fileId)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
|
||||||
|
if !found || len(locations) == 0 {
|
||||||
|
return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
|
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("CreateEntry %v", req)
|
glog.V(4).Infof("CreateEntry %v", req)
|
||||||
|
|
||||||
resp = &filer_pb.CreateEntryResponse{}
|
resp = &filer_pb.CreateEntryResponse{}
|
||||||
|
|
||||||
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
|
chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry)
|
||||||
|
if err2 != nil {
|
||||||
|
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
|
||||||
|
}
|
||||||
|
|
||||||
if req.Entry.Attributes == nil {
|
if req.Entry.Attributes == nil {
|
||||||
glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
|
glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
|
||||||
@@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
|||||||
}, req.OExcl, req.IsFromOtherCluster)
|
}, req.OExcl, req.IsFromOtherCluster)
|
||||||
|
|
||||||
if createErr == nil {
|
if createErr == nil {
|
||||||
fs.filer.DeleteChunks(garbages)
|
fs.filer.DeleteChunks(garbage)
|
||||||
} else {
|
} else {
|
||||||
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
|
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
|
||||||
resp.Error = createErr.Error()
|
resp.Error = createErr.Error()
|
||||||
@@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
|||||||
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
|
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove old chunks if not included in the new ones
|
chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry)
|
||||||
unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
|
if err2 != nil {
|
||||||
|
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
|
||||||
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
|
}
|
||||||
|
|
||||||
newEntry := &filer2.Entry{
|
newEntry := &filer2.Entry{
|
||||||
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
|
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
|
||||||
@@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
|
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
|
||||||
fs.filer.DeleteChunks(unusedChunks)
|
fs.filer.DeleteChunks(garbage)
|
||||||
fs.filer.DeleteChunks(garbages)
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
|
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
|
||||||
}
|
}
|
||||||
@@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
|||||||
return &filer_pb.UpdateEntryResponse{}, err
|
return &filer_pb.UpdateEntryResponse{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
|
||||||
|
chunks = newEntry.Chunks
|
||||||
|
|
||||||
|
// remove old chunks if not included in the new ones
|
||||||
|
if existingEntry != nil {
|
||||||
|
garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
|
||||||
|
if err != nil {
|
||||||
|
return chunks, nil, fmt.Errorf("MinusChunks: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// files with manifest chunks are usually large and append only, skip calculating covered chunks
|
||||||
|
var coveredChunks []*filer_pb.FileChunk
|
||||||
|
if !filer2.HasChunkManifest(newEntry.Chunks) {
|
||||||
|
chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks)
|
||||||
|
garbage = append(garbage, coveredChunks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
|
||||||
|
newEntry.Attributes.Replication,
|
||||||
|
newEntry.Attributes.Collection,
|
||||||
|
"",
|
||||||
|
needle.SecondsToTTL(newEntry.Attributes.TtlSec),
|
||||||
|
false), chunks)
|
||||||
|
if err != nil {
|
||||||
|
// not good, but should be ok
|
||||||
|
glog.V(0).Infof("MaybeManifestize: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
|
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
|
||||||
|
|
||||||
glog.V(4).Infof("AppendToEntry %v", req)
|
glog.V(4).Infof("AppendToEntry %v", req)
|
||||||
@@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
|||||||
|
|
||||||
entry.Chunks = append(entry.Chunks, req.Chunks...)
|
entry.Chunks = append(entry.Chunks, req.Chunks...)
|
||||||
|
|
||||||
|
entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
|
||||||
|
entry.Replication,
|
||||||
|
entry.Collection,
|
||||||
|
"",
|
||||||
|
needle.SecondsToTTL(entry.TtlSec),
|
||||||
|
false), entry.Chunks)
|
||||||
|
if err != nil {
|
||||||
|
// not good, but should be ok
|
||||||
|
glog.V(0).Infof("MaybeManifestize: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
err = fs.filer.CreateEntry(context.Background(), entry, false, false)
|
err = fs.filer.CreateEntry(context.Background(), entry, false, false)
|
||||||
|
|
||||||
return &filer_pb.AppendToEntryResponse{}, err
|
return &filer_pb.AppendToEntryResponse{}, err
|
||||||
|
@@ -40,7 +40,7 @@ type FilerPostResult struct {
|
|||||||
Url string `json:"url,omitempty"`
|
Url string `json:"url,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
|
func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
|
||||||
|
|
||||||
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
|
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@@ -67,7 +67,6 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
|
|||||||
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
|
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
glog.Errorf("failing to assign a file id: %v", ae)
|
glog.Errorf("failing to assign a file id: %v", ae)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, ae)
|
|
||||||
err = ae
|
err = ae
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -114,7 +113,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
|
fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
||||||
|
|
||||||
if err != nil || fileId == "" || urlLocation == "" {
|
if err != nil || fileId == "" || urlLocation == "" {
|
||||||
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
|
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
|
||||||
|
@@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
|||||||
limitedReader := io.LimitReader(partReader, int64(chunkSize))
|
limitedReader := io.LimitReader(partReader, int64(chunkSize))
|
||||||
|
|
||||||
// assign one file id for one chunk
|
// assign one file id for one chunk
|
||||||
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
|
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
||||||
if assignErr != nil {
|
if assignErr != nil {
|
||||||
return nil, assignErr
|
return nil, assignErr
|
||||||
}
|
}
|
||||||
@@ -132,6 +132,12 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
||||||
|
if replyerr != nil {
|
||||||
|
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
path := r.URL.Path
|
path := r.URL.Path
|
||||||
if strings.HasSuffix(path, "/") {
|
if strings.HasSuffix(path, "/") {
|
||||||
if fileName != "" {
|
if fileName != "" {
|
||||||
@@ -184,3 +190,23 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
|
|||||||
uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
|
uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
|
||||||
return uploadResult, err
|
return uploadResult, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType {
|
||||||
|
|
||||||
|
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
|
||||||
|
// assign one file id for one chunk
|
||||||
|
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
||||||
|
if assignErr != nil {
|
||||||
|
return nil, "", "", assignErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// upload the chunk to the volume server
|
||||||
|
uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
|
||||||
|
if uploadErr != nil {
|
||||||
|
return nil, "", "", uploadErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@@ -19,7 +19,7 @@ import (
|
|||||||
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||||
replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
|
replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
|
||||||
|
|
||||||
fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
|
fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
|
||||||
|
|
||||||
if err != nil || fileId == "" || urlLocation == "" {
|
if err != nil || fileId == "" || urlLocation == "" {
|
||||||
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
|
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
|
||||||
|
@@ -474,7 +474,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
|
|||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
if f.entryViewCache == nil {
|
if f.entryViewCache == nil {
|
||||||
f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
|
f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks)
|
||||||
f.reader = nil
|
f.reader = nil
|
||||||
}
|
}
|
||||||
if f.reader == nil {
|
if f.reader == nil {
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package needle
|
package needle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -139,3 +140,10 @@ func (t TTL) Minutes() uint32 {
|
|||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SecondsToTTL(seconds int32) string {
|
||||||
|
if seconds == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%dm", seconds/60)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user