mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-21 17:37:23 +08:00
tiered storage: can copy to s3, read from s3
master not aware tiered volume yet, file assigning is not working yet
This commit is contained in:
@@ -11,15 +11,12 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/stats"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
_ "github.com/chrislusf/seaweedfs/weed/statik"
|
||||
"github.com/gorilla/mux"
|
||||
|
@@ -1,9 +1,9 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
||||
@@ -11,28 +11,83 @@ import (
|
||||
)
|
||||
|
||||
// VolumeTierCopyDatToRemote copy dat file to a remote tier
|
||||
func (vs *VolumeServer) VolumeTierCopyDatToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest) (*volume_server_pb.VolumeTierCopyDatToRemoteResponse, error) {
|
||||
func (vs *VolumeServer) VolumeTierCopyDatToRemote(req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierCopyDatToRemoteServer) error {
|
||||
|
||||
// find existing volume
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
return fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
|
||||
// verify the collection
|
||||
if v.Collection != req.Collection {
|
||||
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
|
||||
return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
|
||||
}
|
||||
|
||||
// locate the disk file
|
||||
diskFile, ok := v.DataBackend.(*backend.DiskFile)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("volume %d is not on local disk", req.VolumeId)
|
||||
return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
|
||||
}
|
||||
err := uploadFileToRemote(ctx, req, diskFile.File)
|
||||
|
||||
return &volume_server_pb.VolumeTierCopyDatToRemoteResponse{}, err
|
||||
}
|
||||
// check valid storage backend type
|
||||
backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
|
||||
if !found {
|
||||
var keys []string
|
||||
for key := range backend.BackendStorages {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
|
||||
}
|
||||
|
||||
func uploadFileToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, f *os.File) error {
|
||||
println("copying dat file of", f.Name(), "to remote")
|
||||
// check whether the existing backend storage is the same as requested
|
||||
// if same, skip
|
||||
backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
|
||||
for _, remoteFile := range v.GetVolumeTierInfo().GetFiles() {
|
||||
if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
|
||||
return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
|
||||
}
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
fn := func(progressed int64, percentage float32) error {
|
||||
now := time.Now()
|
||||
if now.Sub(startTime) < time.Second {
|
||||
return nil
|
||||
}
|
||||
startTime = now
|
||||
return stream.Send(&volume_server_pb.VolumeTierCopyDatToRemoteResponse{
|
||||
Processed: progressed,
|
||||
ProcessedPercentage: percentage,
|
||||
})
|
||||
}
|
||||
// copy the data file
|
||||
key, size, err := backendStorage.CopyFile(diskFile.File, fn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.String(), err)
|
||||
}
|
||||
|
||||
// save the remote file to volume tier info
|
||||
v.GetVolumeTierInfo().Files = append(v.GetVolumeTierInfo().GetFiles(), &volume_server_pb.RemoteFile{
|
||||
BackendType: backendType,
|
||||
BackendId: backendId,
|
||||
Key: key,
|
||||
Offset: 0,
|
||||
FileSize: uint64(size),
|
||||
ModifiedTime: uint64(time.Now().Unix()),
|
||||
})
|
||||
|
||||
if err := v.SaveVolumeTierInfo(); err != nil {
|
||||
return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
|
||||
}
|
||||
|
||||
if err := v.LoadRemoteFile(); err != nil {
|
||||
return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
|
||||
}
|
||||
|
||||
if !req.KeepLocalDatFile {
|
||||
os.Remove(v.FileName() + ".dat")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user