mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-08-24 02:57:54 +08:00
weed/shell: Cleanup volume balance logic (#5241)
This commit is contained in:
parent
0f8e76bbd6
commit
1f08010ef0
@ -80,7 +80,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
}
|
}
|
||||||
|
|
||||||
// collect topology information
|
// collect topology information
|
||||||
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 15*time.Second)
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -95,16 +95,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, c := range collections {
|
for _, c := range collections {
|
||||||
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
|
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, c, *applyBalancing); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if *collection == "ALL_COLLECTIONS" {
|
|
||||||
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
|
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, *collection, *applyBalancing); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,10 +108,10 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
|
func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
|
||||||
|
|
||||||
for _, diskType := range diskTypes {
|
for _, diskType := range diskTypes {
|
||||||
if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil {
|
if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, collection, applyBalancing); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,7 +119,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
|
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
|
||||||
|
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
|
||||||
@ -165,7 +161,7 @@ func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskTy
|
|||||||
for _, dc := range t.DataCenterInfos {
|
for _, dc := range t.DataCenterInfos {
|
||||||
for _, r := range dc.RackInfos {
|
for _, r := range dc.RackInfos {
|
||||||
for _, dn := range r.DataNodeInfos {
|
for _, dn := range r.DataNodeInfos {
|
||||||
for diskType, _ := range dn.DiskInfos {
|
for diskType := range dn.DiskInfos {
|
||||||
if _, found := knownTypes[diskType]; !found {
|
if _, found := knownTypes[diskType]; !found {
|
||||||
knownTypes[diskType] = true
|
knownTypes[diskType] = true
|
||||||
}
|
}
|
||||||
@ -173,7 +169,7 @@ func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskTy
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for diskType, _ := range knownTypes {
|
for diskType := range knownTypes {
|
||||||
diskTypes = append(diskTypes, types.ToDiskType(diskType))
|
diskTypes = append(diskTypes, types.ToDiskType(diskType))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -255,7 +255,7 @@ func TestBalance(t *testing.T) {
|
|||||||
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
|
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
|
||||||
diskTypes := collectVolumeDiskTypes(topologyInfo)
|
diskTypes := collectVolumeDiskTypes(topologyInfo)
|
||||||
|
|
||||||
if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, 30*1024*1024*1024, "ALL_COLLECTIONS", false); err != nil {
|
if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS", false); err != nil {
|
||||||
t.Errorf("balance: %v", err)
|
t.Errorf("balance: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user