mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-09 05:06:14 +08:00
ec shard delete also check ec volumes, in addition to volumes
This commit is contained in:
@@ -69,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr
|
||||
}
|
||||
|
||||
// apply to all volumes in the collection
|
||||
volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
|
||||
volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -143,7 +143,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl
|
||||
}
|
||||
|
||||
// ask the source volume server to clean up copied ec shards
|
||||
err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds)
|
||||
err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
|
||||
}
|
||||
@@ -177,7 +177,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
|
||||
go func(server *EcNode, startFromShardId uint32, shardCount int) {
|
||||
defer wg.Done()
|
||||
copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
|
||||
startFromShardId, shardCount, volumeId, collection, existingLocation)
|
||||
startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
|
||||
if copyErr != nil {
|
||||
err = copyErr
|
||||
} else {
|
||||
@@ -203,23 +203,23 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
|
||||
|
||||
func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
|
||||
targetServer *EcNode, startFromShardId uint32, shardCount int,
|
||||
volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) {
|
||||
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
|
||||
|
||||
var shardIdsToCopy []uint32
|
||||
for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
|
||||
fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id)
|
||||
fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id)
|
||||
shardIdsToCopy = append(shardIdsToCopy, shardId)
|
||||
}
|
||||
|
||||
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
if targetServer.info.Id != existingLocation.Url {
|
||||
if targetServer.info.Id != existingLocation {
|
||||
|
||||
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
Collection: collection,
|
||||
ShardIds: shardIdsToCopy,
|
||||
SourceDataNode: existingLocation.Url,
|
||||
SourceDataNode: existingLocation,
|
||||
})
|
||||
if copyErr != nil {
|
||||
return copyErr
|
||||
@@ -235,9 +235,9 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
|
||||
return mountErr
|
||||
}
|
||||
|
||||
if targetServer.info.Id != existingLocation.Url {
|
||||
if targetServer.info.Id != existingLocation {
|
||||
copiedShardIds = shardIdsToCopy
|
||||
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds)
|
||||
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -251,11 +251,11 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
|
||||
}
|
||||
|
||||
func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
|
||||
volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error {
|
||||
volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
|
||||
|
||||
shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
|
||||
|
||||
return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
ShardIds: toBeDeletedShardIds,
|
||||
@@ -349,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN
|
||||
return
|
||||
}
|
||||
|
||||
func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
|
||||
func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
|
||||
|
||||
var resp *master_pb.VolumeListResponse
|
||||
err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
|
||||
|
||||
Reference in New Issue
Block a user