add deletion during benchmarking

This commit is contained in:
Chris Lu
2014-03-19 04:44:59 -07:00
parent b2e8bfb54e
commit 252e553ef2

View File

@@ -19,17 +19,18 @@ import (
) )
type BenchmarkOptions struct { type BenchmarkOptions struct {
server *string server *string
concurrency *int concurrency *int
numberOfFiles *int numberOfFiles *int
fileSize *int fileSize *int
idListFile *string idListFile *string
write *bool write *bool
read *bool deletePercentage *int
sequentialRead *bool read *bool
collection *string sequentialRead *bool
cpuprofile *string collection *string
vid2server map[string]string //cache for vid locations cpuprofile *string
vid2server map[string]string //cache for vid locations
} }
var ( var (
@@ -41,10 +42,11 @@ func init() {
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information") cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location") b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes") b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes") b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread") b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids") b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
b.write = cmdBenchmark.Flag.Bool("write", true, "enable write") b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
@@ -85,11 +87,16 @@ var cmdBenchmark = &Command{
} }
var ( var (
wait sync.WaitGroup wait sync.WaitGroup
writeStats *stats writeStats *stats
readStats *stats readStats *stats
serverLimitChan map[string]chan bool
) )
func init() {
serverLimitChan = make(map[string]chan bool)
}
func runbenchmark(cmd *Command, args []string) bool { func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
if *b.cpuprofile != "" { if *b.cpuprofile != "" {
@@ -123,6 +130,7 @@ func bench_write() {
go writeFiles(idChan, fileIdLineChan, writeStats) go writeFiles(idChan, fileIdLineChan, writeStats)
} }
writeStats.start = time.Now() writeStats.start = time.Now()
writeStats.total = *b.numberOfFiles
go writeStats.checkProgress("Writing Benchmark", finishChan) go writeStats.checkProgress("Writing Benchmark", finishChan)
for i := 0; i < *b.numberOfFiles; i++ { for i := 0; i < *b.numberOfFiles; i++ {
idChan <- i idChan <- i
@@ -145,6 +153,7 @@ func bench_read() {
wait.Add(*b.concurrency) wait.Add(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan) go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now() readStats.start = time.Now()
readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan) go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ { for i := 0; i < *b.concurrency; i++ {
go readFiles(fileIdLineChan, readStats) go readFiles(fileIdLineChan, readStats)
@@ -157,7 +166,6 @@ func bench_read() {
} }
func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
serverLimitChan := make(map[string]chan bool)
for { for {
if id, ok := <-idChan; ok { if id, ok := <-idChan; ok {
start := time.Now() start := time.Now()
@@ -170,7 +178,21 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
} }
serverLimitChan[fp.Server] <- true serverLimitChan[fp.Server] <- true
if _, err := fp.Upload(0, *b.server); err == nil { if _, err := fp.Upload(0, *b.server); err == nil {
fileIdLineChan <- fp.Fid if rand.Intn(100) < *b.deletePercentage {
s.total++
go func() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
serverLimitChan[fp.Server] <- true
if e := operation.DeleteFile(*b.server, fp.Fid); e == nil {
s.completed++
} else {
s.failed++
}
<-serverLimitChan[fp.Server]
}()
} else {
fileIdLineChan <- fp.Fid
}
s.completed++ s.completed++
s.transferred += fileSize s.transferred += fileSize
} else { } else {
@@ -189,6 +211,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
break break
} }
} }
//wait for the deleting goroutines
time.Sleep(time.Duration(1500) * time.Millisecond)
wait.Done() wait.Done()
} }
@@ -291,8 +315,10 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
break break
} }
} }
for i := 0; i < *b.numberOfFiles; i++ { if len(lines) > 0 {
fileIdLineChan <- lines[rand.Intn(len(lines))] for i := 0; i < *b.numberOfFiles; i++ {
fileIdLineChan <- lines[rand.Intn(len(lines))]
}
} }
} }
@@ -310,6 +336,7 @@ type stats struct {
overflow []int overflow []int
completed int completed int
failed int failed int
total int
transferred int64 transferred int64
start time.Time start time.Time
end time.Time end time.Time
@@ -343,7 +370,7 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
case t := <-ticker: case t := <-ticker:
completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime) completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n", fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
s.completed, *b.numberOfFiles, float64(s.completed)*100/float64(*b.numberOfFiles), s.completed, s.total, float64(s.completed)*100/float64(s.total),
float64(completed)*float64(int64(time.Second))/float64(int64(taken)), float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
) )