mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 23:17:23 +08:00
mount: optional limit for the number of concurrent writers
This commit is contained in:
@@ -9,12 +9,6 @@ import (
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
var (
|
||||
concurrentWriterLimit = runtime.NumCPU()
|
||||
concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
|
||||
)
|
||||
|
||||
type ContinuousDirtyPages struct {
|
||||
@@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
|
||||
dirtyPages := &ContinuousDirtyPages{
|
||||
intervals: &ContinuousIntervals{},
|
||||
f: file,
|
||||
chunkSaveErrChan: make(chan error, concurrentWriterLimit),
|
||||
chunkSaveErrChan: make(chan error, runtime.NumCPU()),
|
||||
}
|
||||
go func() {
|
||||
for t := range dirtyPages.chunkSaveErrChan {
|
||||
@@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
|
||||
|
||||
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
|
||||
|
||||
errChanSize := pages.f.wfs.option.ConcurrentWriters
|
||||
if errChanSize == 0 {
|
||||
errChanSize = runtime.NumCPU()
|
||||
}
|
||||
if pages.chunkSaveErrChanClosed {
|
||||
pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit)
|
||||
pages.chunkSaveErrChan = make(chan error, errChanSize)
|
||||
pages.chunkSaveErrChanClosed = false
|
||||
}
|
||||
|
||||
mtime := time.Now().UnixNano()
|
||||
pages.writeWaitGroup.Add(1)
|
||||
go func() {
|
||||
writer := func() {
|
||||
defer pages.writeWaitGroup.Done()
|
||||
|
||||
reader = io.LimitReader(reader, size)
|
||||
@@ -121,7 +119,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
|
||||
pages.collection, pages.replication = collection, replication
|
||||
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
|
||||
glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
|
||||
}()
|
||||
}
|
||||
|
||||
if pages.f.wfs.concurrentWriters != nil {
|
||||
pages.f.wfs.concurrentWriters.Execute(writer)
|
||||
} else {
|
||||
go writer()
|
||||
}
|
||||
}
|
||||
|
||||
func max(x, y int64) int64 {
|
||||
|
@@ -31,6 +31,7 @@ type Option struct {
|
||||
Replication string
|
||||
TtlSec int32
|
||||
ChunkSizeLimit int64
|
||||
ConcurrentWriters int
|
||||
CacheDir string
|
||||
CacheSizeMB int64
|
||||
DataCenter string
|
||||
@@ -68,6 +69,9 @@ type WFS struct {
|
||||
chunkCache *chunk_cache.TieredChunkCache
|
||||
metaCache *meta_cache.MetaCache
|
||||
signature int32
|
||||
|
||||
// throttle writers
|
||||
concurrentWriters *util.LimitedConcurrentExecutor
|
||||
}
|
||||
type statsCache struct {
|
||||
filer_pb.StatisticsResponse
|
||||
@@ -110,6 +114,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
|
||||
wfs.fsNodeCache = newFsCache(wfs.root)
|
||||
|
||||
if wfs.option.ConcurrentWriters > 0 {
|
||||
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
|
||||
}
|
||||
|
||||
return wfs
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user