filer: dynamically create bucket under /buckets folder

This commit is contained in:
Chris Lu
2020-02-24 22:28:45 -08:00
parent 5bcb44eda9
commit 6ab7368ef2
21 changed files with 436 additions and 183 deletions

View File

@@ -125,10 +125,6 @@ func runCopy(cmd *Command, args []string) bool {
}
copy.masters = masters
copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
go copy.masterClient.KeepConnectedToMaster()
copy.masterClient.WaitUntilConnected()
if *cmdCopy.IsDebug {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
@@ -274,23 +270,35 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
mimeType := detectMimeType(f)
var chunks []*filer_pb.FileChunk
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
if task.fileSize > 0 {
// assign a volume
assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
Ttl: *worker.options.ttl,
err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
ParentPath: task.destinationUrlPath,
}
assignResult, assignError = client.AssignVolume(ctx, request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, security.EncodedJwt(assignResult.Auth), *worker.options.compressionLevel)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -300,7 +308,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
FileId: assignResult.FileId,
Offset: 0,
Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(),
@@ -352,6 +360,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
var collection, replication string
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
@@ -363,22 +372,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
<-concurrentChunks
}()
// assign a volume
assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
Ttl: *worker.options.ttl,
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
ParentPath: task.destinationUrlPath,
}
assignResult, assignError = client.AssignVolume(ctx, request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
if collection == "" {
collection = assignResult.Collection
}
if replication == "" {
replication = assignResult.Replication
}
uploadResult, err := operation.Upload(targetUrl,
fileName+"-"+strconv.FormatInt(i+1, 10),
io.NewSectionReader(f, i*chunkSize, chunkSize),
false, "", nil, assignResult.Auth)
false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -388,7 +417,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
return
}
chunksChan <- &filer_pb.FileChunk{
FileId: assignResult.Fid,
FileId: assignResult.FileId,
Offset: i * chunkSize,
Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(),
@@ -410,7 +439,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
return uploadError
}
@@ -427,8 +456,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
Replication: replication,
Collection: collection,
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
},
Chunks: chunks,