Merge pull request #2349 from kmlebedev/fix_repl_volumes_per_step

Topology update for every Nth id volume
This commit is contained in:
Chris Lu
2021-10-01 12:05:51 -07:00
committed by GitHub
2 changed files with 101 additions and 36 deletions

View File

@@ -208,6 +208,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
} }
func LookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (err error, volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation) {
var resp *master_pb.LookupVolumeResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
return err
})
if err != nil {
return err, nil
}
return nil, resp.VolumeIdLocations
}
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse var resp *master_pb.VolumeListResponse

View File

@@ -10,6 +10,8 @@ import (
"io" "io"
"path/filepath" "path/filepath"
"sort" "sort"
"strconv"
"time"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -56,6 +58,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
if err = volFixReplicationCommand.Parse(args); err != nil { if err = volFixReplicationCommand.Parse(args); err != nil {
return nil return nil
} }
@@ -66,44 +70,87 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := !*skipChange takeAction := !*skipChange
// collect topology information underReplicatedVolumeIdsCount := 1
topologyInfo, _, err := collectTopologyInfo(commandEnv) for underReplicatedVolumeIdsCount > 0 {
if err != nil { fixedVolumeReplicas := map[string]int{}
return err
}
// find all volumes that needs replication // collect topology information
// collect all data nodes topologyInfo, _, err := collectTopologyInfo(commandEnv)
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) if err != nil {
return err
}
if len(allLocations) == 0 { // find all volumes that needs replication
return fmt.Errorf("no data nodes at all") // collect all data nodes
} volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
// find all under replicated volumes if len(allLocations) == 0 {
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 return fmt.Errorf("no data nodes at all")
for vid, replicas := range volumeReplicas { }
replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) // find all under replicated volumes
if replicaPlacement.GetCopyCount() > len(replicas) { var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) for vid, replicas := range volumeReplicas {
} else if replicaPlacement.GetCopyCount() < len(replicas) { replica := replicas[0]
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) if replicaPlacement.GetCopyCount() > len(replicas) {
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
} else if replicaPlacement.GetCopyCount() < len(replicas) {
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
if len(overReplicatedVolumeIds) > 0 {
if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil {
return err
}
}
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
if underReplicatedVolumeIdsCount > 0 {
// find the most under populated data nodes
err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
if err != nil {
return err
}
}
if *skipChange {
break
}
// check that the topology has been updated
if len(fixedVolumeReplicas) > 0 {
fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
for k, _ := range fixedVolumeReplicas {
fixedVolumes = append(fixedVolumes, k)
}
err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes)
if err != nil {
return err
}
for _, volumeIdLocation := range volumeIdLocations {
volumeId := volumeIdLocation.VolumeOrFileId
volumeIdLocationCount := len(volumeIdLocation.Locations)
i := 0
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
time.Sleep(time.Duration(i+1) * time.Second * 7)
err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId})
if err != nil {
return err
}
volumeIdLocationCount = len(volumeLocIds[0].Locations)
if *retryCount > i {
return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
}
i += 1
}
}
} }
} }
return nil
if len(overReplicatedVolumeIds) > 0 {
return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
}
if len(underReplicatedVolumeIds) == 0 {
return nil
}
// find the most under populated data nodes
return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
} }
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
@@ -156,16 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil return nil
} }
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) {
fixedVolumes = map[string]int{}
if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
}
for _, vid := range underReplicatedVolumeIds { for _, vid := range underReplicatedVolumeIds {
for i := 0; i < retryCount+1; i++ { for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
if takeAction {
fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
}
break break
} }
} }
} }
return return nil, fixedVolumes
} }
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {