mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-09-18 23:07:56 +08:00
shell: volume balance follows replica placement
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
@@ -83,6 +84,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
}
|
||||
|
||||
typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
|
||||
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
|
||||
|
||||
for maxVolumeCount, volumeServers := range typeToNodes {
|
||||
if len(volumeServers) < 2 {
|
||||
@@ -95,16 +97,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
return err
|
||||
}
|
||||
for _, c := range collections {
|
||||
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
|
||||
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else if *collection == "ALL_COLLECTIONS" {
|
||||
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
|
||||
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
|
||||
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -113,7 +115,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
||||
return nil
|
||||
}
|
||||
|
||||
func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
|
||||
func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
|
||||
|
||||
// balance writable volumes
|
||||
for _, n := range nodes {
|
||||
@@ -126,7 +128,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
|
||||
return !v.ReadOnly && v.Size < volumeSizeLimit
|
||||
})
|
||||
}
|
||||
if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil {
|
||||
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -141,7 +143,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
|
||||
return v.ReadOnly || v.Size >= volumeSizeLimit
|
||||
})
|
||||
}
|
||||
if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
|
||||
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -186,7 +188,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
|
||||
})
|
||||
}
|
||||
|
||||
func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
|
||||
func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
|
||||
selectedVolumeCount := 0
|
||||
for _, dn := range nodes {
|
||||
selectedVolumeCount += len(dn.selectedVolumes)
|
||||
@@ -216,7 +218,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
|
||||
// no more volume servers with empty slots
|
||||
break
|
||||
}
|
||||
hasMoved, err = attemptToMoveOneVolume(commandEnv, fullNode, candidateVolumes, emptyNode, applyBalancing)
|
||||
hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -229,13 +231,12 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
|
||||
return nil
|
||||
}
|
||||
|
||||
func attemptToMoveOneVolume(commandEnv *CommandEnv, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
|
||||
func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
|
||||
|
||||
for _, v := range candidateVolumes {
|
||||
if v.ReplicaPlacement > 0 {
|
||||
if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
|
||||
// TODO this logic is too simple, but should work most of the time
|
||||
// Need a correct algorithm to handle all different cases
|
||||
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(v.ReplicaPlacement))
|
||||
if !isGoodMove(replicaPlacement, volumeReplicas[v.Id], fullNode, emptyNode) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -273,3 +274,41 @@ func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) b
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool {
|
||||
for _, replica := range existingReplicas {
|
||||
if replica.location.dataNode.Id == targetNode.info.Id &&
|
||||
replica.location.rack == targetNode.rack &&
|
||||
replica.location.dc == targetNode.dc {
|
||||
// never move to existing nodes
|
||||
return false
|
||||
}
|
||||
}
|
||||
dcs, racks := make(map[string]bool), make(map[string]int)
|
||||
for _, replica := range existingReplicas {
|
||||
if replica.location.dataNode.Id != sourceNode.info.Id {
|
||||
dcs[replica.location.DataCenter()] = true
|
||||
racks[replica.location.Rack()]++
|
||||
}
|
||||
}
|
||||
|
||||
dcs[targetNode.dc] = true
|
||||
racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++
|
||||
|
||||
if len(dcs) > placement.DiffDataCenterCount+1 {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(racks) > placement.DiffRackCount+1 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, sameRackCount := range racks {
|
||||
if sameRackCount > placement.SameRackCount+1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user