mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-02-09 09:17:28 +08:00
Feat:merge small chunk (#6049)
* fix:mount deadlock * feat: merge small chunk * adjust MergeChunkMinCount * fix --------- Co-authored-by: zemul <zhouzemiao@ihuman.com>
This commit is contained in:
@@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
|
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -130,7 +130,8 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
|
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,65 @@
|
|||||||
package weed_server
|
package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const MergeChunkMinCount int = 1000
|
||||||
|
|
||||||
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
|
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
|
||||||
//TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
|
// Only merge small chunks more than half of the file
|
||||||
return inputChunks, nil
|
var chunkSize = fs.option.MaxMB * 1024 * 1024
|
||||||
|
var smallChunk, sumChunk int
|
||||||
|
var minOffset int64 = math.MaxInt64
|
||||||
|
for _, chunk := range inputChunks {
|
||||||
|
if chunk.IsChunkManifest {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if chunk.Size < uint64(chunkSize/2) {
|
||||||
|
smallChunk++
|
||||||
|
if chunk.Offset < minOffset {
|
||||||
|
minOffset = chunk.Offset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sumChunk++
|
||||||
|
}
|
||||||
|
if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
|
||||||
|
return inputChunks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs.mergeChunks(so, inputChunks, minOffset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
|
||||||
|
chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
|
||||||
|
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
|
||||||
|
if mergeErr != nil {
|
||||||
|
return nil, mergeErr
|
||||||
|
}
|
||||||
|
mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
|
||||||
|
if mergeErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
|
||||||
|
for _, chunk := range inputChunks {
|
||||||
|
if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
|
||||||
|
mergedChunks = append(mergedChunks, chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
|
||||||
|
mergedChunks, inputChunks)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fs.filer.DeleteChunksNotRecursive(garbage)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ var bufPool = sync.Pool{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
|
func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
|
||||||
query := r.URL.Query()
|
query := r.URL.Query()
|
||||||
|
|
||||||
isAppend := isAppend(r)
|
isAppend := isAppend(r)
|
||||||
@@ -45,7 +45,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
|
|||||||
chunkOffset = offsetInt
|
chunkOffset = offsetInt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
|
||||||
|
|
||||||
md5Hash = md5.New()
|
md5Hash = md5.New()
|
||||||
|
chunkOffset = startOffset
|
||||||
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
|
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|||||||
@@ -27,12 +27,14 @@ const (
|
|||||||
Failed = "failed"
|
Failed = "failed"
|
||||||
|
|
||||||
// filer handler
|
// filer handler
|
||||||
DirList = "dirList"
|
DirList = "dirList"
|
||||||
ContentSaveToFiler = "contentSaveToFiler"
|
ContentSaveToFiler = "contentSaveToFiler"
|
||||||
AutoChunk = "autoChunk"
|
AutoChunk = "autoChunk"
|
||||||
ChunkProxy = "chunkProxy"
|
ChunkProxy = "chunkProxy"
|
||||||
ChunkAssign = "chunkAssign"
|
ChunkAssign = "chunkAssign"
|
||||||
ChunkUpload = "chunkUpload"
|
ChunkUpload = "chunkUpload"
|
||||||
|
ChunkMerge = "chunkMerge"
|
||||||
|
|
||||||
ChunkDoUploadRetry = "chunkDoUploadRetry"
|
ChunkDoUploadRetry = "chunkDoUploadRetry"
|
||||||
ChunkUploadRetry = "chunkUploadRetry"
|
ChunkUploadRetry = "chunkUploadRetry"
|
||||||
ChunkAssignRetry = "chunkAssignRetry"
|
ChunkAssignRetry = "chunkAssignRetry"
|
||||||
|
|||||||
Reference in New Issue
Block a user