generate, balance, delete copied shards, delete old volume

This commit is contained in:
Chris Lu
2019-05-25 14:02:06 -07:00
parent f0e6574d5e
commit 41e8ae61f8
4 changed files with 405 additions and 220 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"os"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -16,17 +17,17 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
1. client call VolumeEcGenerateSlices to generate the .ecx and .ec01~.ec14 files
1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcCopy on above target servers to copy ec files from the source server
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
5. master stores vid -> [14]*DataNode
6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
*/
*/
// VolumeEcGenerateSlices generates the .ecx and .ec01 ~ .ec14 files
func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_server_pb.VolumeEcGenerateSlicesRequest) (*volume_server_pb.VolumeEcGenerateSlicesResponse, error) {
// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
@@ -34,6 +35,10 @@ func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_
}
baseFileName := v.FileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
// write .ecx file
if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
@@ -44,12 +49,11 @@ func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
return &volume_server_pb.VolumeEcGenerateSlicesResponse{}, nil
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
// VolumeEcCopy copy the .ecx and some ec data slices
func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.VolumeEcCopyRequest) (*volume_server_pb.VolumeEcCopyResponse, error) {
// VolumeEcShardsCopy copy the .ecx and some ec data slices
func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
location := vs.store.FindFreeLocation()
if location == nil {
@@ -61,13 +65,13 @@ func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ecx file
if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, ".ecx"); err!=nil{
if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
return err
}
// copy ec data slices
for _, ecIndex := range req.EcIndexes {
if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err!=nil{
if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err != nil {
return err
}
}
@@ -75,8 +79,32 @@ func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.
return nil
})
if err != nil {
return nil, fmt.Errorf("VolumeEcCopy volume %d: %v", req.VolumeId, err)
return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
}
return &volume_server_pb.VolumeEcCopyResponse{}, nil
return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
}
// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
}
baseFileName := v.FileName()
for _, shardIndex := range req.EcIndexes {
if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardIndex))); err != nil {
return nil, err
}
}
if req.ShouldDeleteEcx {
if err := os.Remove(baseFileName + ".ecx"); err != nil {
return nil, err
}
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}