adding volume type

This commit is contained in:
Chris Lu
2020-12-13 00:58:58 -08:00
parent 16cd6fb278
commit e9cd798bd3
35 changed files with 830 additions and 742 deletions

View File

@@ -35,6 +35,7 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
replication *string
volumeType *string
cpuprofile *string
maxCpu *int
grpcDialOption grpc.DialOption
@@ -62,6 +63,7 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.volumeType = cmdBenchmark.Flag.String("volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
@@ -234,6 +236,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Count: 1,
Collection: *b.collection,
Replication: *b.replication,
VolumeType: *b.volumeType,
}
if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection

View File

@@ -37,6 +37,7 @@ type CopyOptions struct {
replication *string
collection *string
ttl *string
volumeType *string
maxMB *int
masterClient *wdclient.MasterClient
concurrenctFiles *int
@@ -54,6 +55,7 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.volumeType = cmdCopy.Flag.String("volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
@@ -311,6 +313,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
VolumeType: *worker.options.volumeType,
Path: task.destinationUrlPath,
}
@@ -405,6 +408,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
VolumeType: *worker.options.volumeType,
Path: task.destinationUrlPath + fileName,
}

View File

@@ -31,6 +31,8 @@ type SyncOptions struct {
bCollection *string
aTtlSec *int
bTtlSec *int
aVolumeType *string
bVolumeType *string
aDebug *bool
bDebug *bool
}
@@ -54,6 +56,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aVolumeType = cmdFilerSynchronize.Flag.String("a.volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive on filer A")
syncOptions.bVolumeType = cmdFilerSynchronize.Flag.String("b.volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive on filer B")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
@@ -87,7 +91,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bVolumeType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -99,7 +103,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aVolumeType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -114,7 +118,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
replicationStr, collection string, ttlSec int, debug bool) error {
replicationStr, collection string, ttlSec int, volumeType string, debug bool) error {
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
@@ -140,7 +144,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSource := &source.FilerSource{}
filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, volumeType, grpcDialOption)
filerSink.SetSourceFiler(filerSource)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {

View File

@@ -12,6 +12,7 @@ type MountOptions struct {
dirAutoCreate *bool
collection *string
replication *string
volumeType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
@@ -41,6 +42,7 @@ func init() {
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.volumeType = cmdMount.Flag.String("volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")

View File

@@ -5,6 +5,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"os"
"os/user"
"path"
@@ -167,6 +168,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
volumeType, err := storage.ToVolumeType(*option.volumeType)
if err != nil {
fmt.Printf("failed to parse volume type: %v\n", err)
return false
}
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
FilerGrpcAddress: filerGrpcAddress,
GrpcDialOption: grpcDialOption,
@@ -174,6 +181,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
VolumeType: volumeType,
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,

View File

@@ -23,6 +23,7 @@ type UploadOptions struct {
collection *string
dataCenter *string
ttl *string
volumeType *string
maxMB *int
usePublicUrl *bool
}
@@ -36,6 +37,7 @@ func init() {
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.volumeType = cmdUpload.Flag.String("volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
@@ -81,7 +83,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.volumeType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -98,7 +100,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.volumeType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}