mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-20 09:17:58 +08:00
Improve EC shards rebalancing logic across racks (#6270)
Improve EC shards rebalancing logic across racks. - Favor target shards with less preexisting shards, to ensure a fair distribution. - Randomize selection when multiple possible target shards are available. - Add logic to account for replication settings when selecting target shards (currently disabled).
This commit is contained in:
@@ -3,6 +3,7 @@ package shell
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
@@ -473,16 +474,19 @@ func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra
|
||||
return nil
|
||||
}
|
||||
|
||||
func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
|
||||
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
|
||||
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
|
||||
shardBits := findEcVolumeShards(ecNode, vid)
|
||||
return string(ecNode.rack), shardBits.ShardIdCount()
|
||||
})
|
||||
}
|
||||
|
||||
func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
|
||||
// calculate average number of shards an ec rack should have for one volume
|
||||
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
|
||||
|
||||
// see the volume's shards are in how many racks, and how many in each rack
|
||||
rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
|
||||
shardBits := findEcVolumeShards(ecNode, vid)
|
||||
return string(ecNode.rack), shardBits.ShardIdCount()
|
||||
})
|
||||
rackToShardCount := countShardsByRack(vid, locations)
|
||||
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
|
||||
return string(ecNode.rack)
|
||||
})
|
||||
@@ -490,16 +494,18 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid
|
||||
// ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
|
||||
ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
|
||||
for rackId, count := range rackToShardCount {
|
||||
if count > averageShardsPerEcRack {
|
||||
possibleEcNodes := rackEcNodesWithVid[rackId]
|
||||
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
|
||||
ecShardsToMove[shardId] = ecNode
|
||||
}
|
||||
if count <= averageShardsPerEcRack {
|
||||
continue
|
||||
}
|
||||
possibleEcNodes := rackEcNodesWithVid[rackId]
|
||||
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
|
||||
ecShardsToMove[shardId] = ecNode
|
||||
}
|
||||
}
|
||||
|
||||
for shardId, ecNode := range ecShardsToMove {
|
||||
rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
|
||||
// TODO: consider volume replica info when balancing racks
|
||||
rackId := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack)
|
||||
if rackId == "" {
|
||||
fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
|
||||
continue
|
||||
@@ -521,23 +527,44 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid
|
||||
return nil
|
||||
}
|
||||
|
||||
func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
|
||||
|
||||
// TODO later may need to add some randomness
|
||||
|
||||
for rackId, rack := range rackToEcNodes {
|
||||
if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
|
||||
continue
|
||||
func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId {
|
||||
targets := []RackId{}
|
||||
targetShards := -1
|
||||
for _, shards := range rackToShardCount {
|
||||
if shards > targetShards {
|
||||
targetShards = shards
|
||||
}
|
||||
|
||||
if rack.freeEcSlot <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
return rackId
|
||||
}
|
||||
|
||||
return ""
|
||||
for rackId, rack := range rackToEcNodes {
|
||||
shards := rackToShardCount[string(rackId)]
|
||||
|
||||
if rack.freeEcSlot <= 0 {
|
||||
// No EC shards slots left :(
|
||||
continue
|
||||
}
|
||||
if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount {
|
||||
// Don't select racks with more EC shards for the target volume than the replicaton limit.
|
||||
continue
|
||||
}
|
||||
if shards >= averageShardsPerEcRack {
|
||||
// Keep EC shards across racks as balanced as possible.
|
||||
continue
|
||||
}
|
||||
if shards < targetShards {
|
||||
// Favor racks with less shards, to ensure an uniform distribution.
|
||||
targets = nil
|
||||
targetShards = shards
|
||||
}
|
||||
if shards == targetShards {
|
||||
targets = append(targets, rackId)
|
||||
}
|
||||
}
|
||||
|
||||
if len(targets) == 0 {
|
||||
return ""
|
||||
}
|
||||
return targets[rand.Intn(len(targets))]
|
||||
}
|
||||
|
||||
func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
|
||||
@@ -774,6 +801,7 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
|
||||
return vidLocations
|
||||
}
|
||||
|
||||
// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default?
|
||||
func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
|
||||
for _, ecNode := range nodes {
|
||||
for _, diskInfo := range ecNode.info.DiskInfos {
|
||||
@@ -789,6 +817,21 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl
|
||||
return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
|
||||
}
|
||||
|
||||
func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
|
||||
var resp *master_pb.GetMasterConfigurationResponse
|
||||
var err error
|
||||
|
||||
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
||||
resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
|
||||
}
|
||||
|
||||
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
|
||||
if len(collections) == 0 {
|
||||
return fmt.Errorf("no collections to balance")
|
||||
|
Reference in New Issue
Block a user