mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 16:37:25 +08:00
volume: add option to limit compaction speed
This commit is contained in:
@@ -14,9 +14,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
|
||||
}
|
||||
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
|
||||
}
|
||||
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64) error {
|
||||
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
|
||||
if v := s.findVolume(vid); v != nil {
|
||||
return v.Compact(preallocate)
|
||||
return v.Compact(preallocate, compactionBytePerSecond)
|
||||
}
|
||||
return fmt.Errorf("volume id %d is not found during compact", vid)
|
||||
}
|
||||
|
@@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 {
|
||||
return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
|
||||
}
|
||||
|
||||
func (v *Volume) Compact(preallocate int64) error {
|
||||
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
|
||||
glog.V(3).Infof("Compacting volume %d ...", v.Id)
|
||||
//no need to lock for copy on write
|
||||
//v.accessLock.Lock()
|
||||
@@ -29,7 +29,7 @@ func (v *Volume) Compact(preallocate int64) error {
|
||||
v.lastCompactIndexOffset = v.nm.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
|
||||
}
|
||||
|
||||
func (v *Volume) Compact2() error {
|
||||
@@ -236,12 +236,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
||||
}
|
||||
|
||||
type VolumeFileScanner4Vacuum struct {
|
||||
version needle.Version
|
||||
v *Volume
|
||||
dst *os.File
|
||||
nm *NeedleMap
|
||||
newOffset int64
|
||||
now uint64
|
||||
version needle.Version
|
||||
v *Volume
|
||||
dst *os.File
|
||||
nm *NeedleMap
|
||||
newOffset int64
|
||||
now uint64
|
||||
compactionBytePerSecond int64
|
||||
lastSizeCounter int64
|
||||
lastSizeCheckTime time.Time
|
||||
}
|
||||
|
||||
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
|
||||
@@ -269,13 +272,32 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
|
||||
if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil {
|
||||
return fmt.Errorf("cannot append needle: %s", err)
|
||||
}
|
||||
scanner.newOffset += n.DiskSize(scanner.version)
|
||||
delta := n.DiskSize(scanner.version)
|
||||
scanner.newOffset += delta
|
||||
scanner.maybeSlowdown(delta)
|
||||
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) {
|
||||
if scanner.compactionBytePerSecond > 0 {
|
||||
scanner.lastSizeCounter += delta
|
||||
now := time.Now()
|
||||
elapsedDuration := now.Sub(scanner.lastSizeCheckTime)
|
||||
if elapsedDuration > 100*time.Millisecond {
|
||||
overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10
|
||||
if overLimitBytes > 0 {
|
||||
overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond)
|
||||
sleepTime := time.Duration(overRatio*1000) * time.Millisecond
|
||||
// glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
|
||||
time.Sleep(sleepTime)
|
||||
}
|
||||
scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64) (err error) {
|
||||
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
|
||||
var (
|
||||
dst, idx *os.File
|
||||
)
|
||||
@@ -290,10 +312,12 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||
defer idx.Close()
|
||||
|
||||
scanner := &VolumeFileScanner4Vacuum{
|
||||
v: v,
|
||||
now: uint64(time.Now().Unix()),
|
||||
nm: NewBtreeNeedleMap(idx),
|
||||
dst: dst,
|
||||
v: v,
|
||||
now: uint64(time.Now().Unix()),
|
||||
nm: NewBtreeNeedleMap(idx),
|
||||
dst: dst,
|
||||
compactionBytePerSecond: compactionBytePerSecond,
|
||||
lastSizeCheckTime: time.Now(),
|
||||
}
|
||||
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
|
||||
return
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
@@ -72,8 +73,8 @@ func TestCompaction(t *testing.T) {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
|
||||
beforeCommitFileCount := 1000
|
||||
afterCommitFileCount := 1000
|
||||
beforeCommitFileCount := 10000
|
||||
afterCommitFileCount := 10000
|
||||
|
||||
infos := make([]*needleInfo, beforeCommitFileCount+afterCommitFileCount)
|
||||
|
||||
@@ -81,7 +82,10 @@ func TestCompaction(t *testing.T) {
|
||||
doSomeWritesDeletes(i, v, t, infos)
|
||||
}
|
||||
|
||||
v.Compact(0)
|
||||
startTime := time.Now()
|
||||
v.Compact(0, 1024*1024)
|
||||
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
|
||||
t.Logf("compaction speed: %.2f bytes/s", speed)
|
||||
|
||||
for i := 1; i <= afterCommitFileCount; i++ {
|
||||
doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos)
|
||||
|
Reference in New Issue
Block a user