mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-12-17 09:42:29 +08:00
adds VolumeEcGenerateSlices, VolumeEcCopy
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -49,37 +50,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
volumeFileName = storage.VolumeFileName(volFileInfoResp.Collection, location.Directory, int(req.VolumeId))
|
||||
|
||||
// println("source:", volFileInfoResp.String())
|
||||
|
||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
Ext: ".idx",
|
||||
CompactionRevision: volFileInfoResp.CompactionRevision,
|
||||
StopOffset: volFileInfoResp.IdxFileSize,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
|
||||
// copy ecx file
|
||||
if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err!=nil{
|
||||
return err
|
||||
}
|
||||
|
||||
idxFileName = volumeFileName + ".idx"
|
||||
err = writeToFile(copyFileClient, idxFileName, util.NewWriteThrottler(vs.compactionBytePerSecond))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
Ext: ".dat",
|
||||
CompactionRevision: volFileInfoResp.CompactionRevision,
|
||||
StopOffset: volFileInfoResp.DatFileSize,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
datFileName = volumeFileName + ".dat"
|
||||
err = writeToFile(copyFileClient, datFileName, util.NewWriteThrottler(vs.compactionBytePerSecond))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
|
||||
if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err!=nil{
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -109,6 +86,28 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
}, err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, vid uint32,
|
||||
compactRevision uint32, stopOffset uint64, baseFileName, ext string) error {
|
||||
|
||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: vid,
|
||||
Ext: ext,
|
||||
CompactionRevision: compactRevision,
|
||||
StopOffset: stopOffset,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
|
||||
}
|
||||
|
||||
err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume %d %s file: %v", vid, ext, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
only check the the differ of the file size
|
||||
todo: maybe should check the received count and deleted count of the volume
|
||||
@@ -175,6 +174,9 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// CopyFile client pulls the volume related file from the source server.
|
||||
// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
|
||||
// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
|
||||
func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
@@ -182,7 +184,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
|
||||
return fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||
}
|
||||
|
||||
if uint32(v.CompactionRevision) != req.CompactionRevision {
|
||||
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
|
||||
return fmt.Errorf("volume %d is compacted", req.VolumeId)
|
||||
}
|
||||
|
||||
|
||||
82
weed/server/volume_grpc_erasure_coding.go
Normal file
82
weed/server/volume_grpc_erasure_coding.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
/*
|
||||
|
||||
Steps to apply erasure coding to .dat .idx files
|
||||
0. ensure the volume is readonly
|
||||
1. client call VolumeEcGenerateSlices to generate the .ecx and .ec01~.ec14 files
|
||||
2. client ask master for possible servers to hold the ec files, at least 4 servers
|
||||
3. client call VolumeEcCopy on above target servers to copy ec files from the source server
|
||||
4. target servers report the new ec files to the master
|
||||
5. master stores vid -> [14]*DataNode
|
||||
6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
|
||||
|
||||
*/
|
||||
|
||||
// VolumeEcGenerateSlices generates the .ecx and .ec01 ~ .ec14 files
|
||||
func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_server_pb.VolumeEcGenerateSlicesRequest) (*volume_server_pb.VolumeEcGenerateSlicesResponse, error) {
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
baseFileName := v.FileName()
|
||||
|
||||
// write .ecx file
|
||||
if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
|
||||
return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
|
||||
}
|
||||
|
||||
// write .ec01 ~ .ec14 files
|
||||
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
|
||||
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
|
||||
}
|
||||
|
||||
|
||||
return &volume_server_pb.VolumeEcGenerateSlicesResponse{}, nil
|
||||
}
|
||||
|
||||
// VolumeEcCopy copy the .ecx and some ec data slices
|
||||
func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.VolumeEcCopyRequest) (*volume_server_pb.VolumeEcCopyResponse, error) {
|
||||
|
||||
location := vs.store.FindFreeLocation()
|
||||
if location == nil {
|
||||
return nil, fmt.Errorf("no space left")
|
||||
}
|
||||
|
||||
baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
|
||||
|
||||
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
// copy ecx file
|
||||
if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, ".ecx"); err!=nil{
|
||||
return err
|
||||
}
|
||||
|
||||
// copy ec data slices
|
||||
for _, ecIndex := range req.EcIndexes {
|
||||
if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err!=nil{
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("VolumeEcCopy volume %d: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
return &volume_server_pb.VolumeEcCopyResponse{}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user