mirror of
				https://github.com/seaweedfs/seaweedfs.git
				synced 2025-10-21 04:57:24 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			377 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package shell
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/types"
 | |
| 	"math"
 | |
| 	"sort"
 | |
| 
 | |
| 	"github.com/chrislusf/seaweedfs/weed/glog"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/operation"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
 | |
| 	"github.com/chrislusf/seaweedfs/weed/storage/needle"
 | |
| 	"google.golang.org/grpc"
 | |
| )
 | |
| 
 | |
| func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
 | |
| 
 | |
| 	copiedShardIds := []uint32{uint32(shardId)}
 | |
| 
 | |
| 	if applyBalancing {
 | |
| 
 | |
| 		// ask destination node to copy shard and the ecx file from source node, and mount it
 | |
| 		copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// unmount the to be deleted shards
 | |
| 		err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// ask source node to delete the shard, and maybe the ecx file
 | |
| 		err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
 | |
| 	existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
 | |
| 
 | |
| 	return nil
 | |
| 
 | |
| }
 | |
| 
 | |
| func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
 | |
| 	targetServer *EcNode, shardIdsToCopy []uint32,
 | |
| 	volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
 | |
| 
 | |
| 	fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
 | |
| 
 | |
| 	err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
 | |
| 
 | |
| 		if targetServer.info.Id != existingLocation {
 | |
| 
 | |
| 			fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
 | |
| 			_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
 | |
| 				VolumeId:       uint32(volumeId),
 | |
| 				Collection:     collection,
 | |
| 				ShardIds:       shardIdsToCopy,
 | |
| 				CopyEcxFile:    true,
 | |
| 				CopyEcjFile:    true,
 | |
| 				CopyVifFile:    true,
 | |
| 				SourceDataNode: existingLocation,
 | |
| 			})
 | |
| 			if copyErr != nil {
 | |
| 				return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
 | |
| 		_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
 | |
| 			VolumeId:   uint32(volumeId),
 | |
| 			Collection: collection,
 | |
| 			ShardIds:   shardIdsToCopy,
 | |
| 		})
 | |
| 		if mountErr != nil {
 | |
| 			return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
 | |
| 		}
 | |
| 
 | |
| 		if targetServer.info.Id != existingLocation {
 | |
| 			copiedShardIds = shardIdsToCopy
 | |
| 			glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
 | |
| 	for _, dc := range topo.DataCenterInfos {
 | |
| 		for _, rack := range dc.RackInfos {
 | |
| 			for _, dn := range rack.DataNodeInfos {
 | |
| 				fn(dc.Id, RackId(rack.Id), dn)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
 | |
| 	sort.Slice(ecNodes, func(i, j int) bool {
 | |
| 		return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
 | |
| 	sort.Slice(ecNodes, func(i, j int) bool {
 | |
| 		return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type CandidateEcNode struct {
 | |
| 	ecNode     *EcNode
 | |
| 	shardCount int
 | |
| }
 | |
| 
 | |
| // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
 | |
| func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
 | |
| 	for i := index - 1; i >= 0; i-- {
 | |
| 		if lessThan(i+1, i) {
 | |
| 			swap(data, i, i+1)
 | |
| 		} else {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	for i := index + 1; i < len(data); i++ {
 | |
| 		if lessThan(i, i-1) {
 | |
| 			swap(data, i, i-1)
 | |
| 		} else {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func swap(data []*CandidateEcNode, i, j int) {
 | |
| 	t := data[i]
 | |
| 	data[i] = data[j]
 | |
| 	data[j] = t
 | |
| }
 | |
| 
 | |
| func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
 | |
| 	for _, ecShardInfo := range ecShardInfos {
 | |
| 		shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
 | |
| 		count += shardBits.ShardIdCount()
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
 | |
| 	if dn.DiskInfos == nil {
 | |
| 		return 0
 | |
| 	}
 | |
| 	diskInfo := dn.DiskInfos[string(diskType)]
 | |
| 	if diskInfo == nil {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
 | |
| }
 | |
| 
 | |
| type RackId string
 | |
| type EcNodeId string
 | |
| 
 | |
| type EcNode struct {
 | |
| 	info       *master_pb.DataNodeInfo
 | |
| 	dc         string
 | |
| 	rack       RackId
 | |
| 	freeEcSlot int
 | |
| }
 | |
| 
 | |
| func (ecNode *EcNode) localShardIdCount(vid uint32) int {
 | |
| 	for _, diskInfo := range ecNode.info.DiskInfos {
 | |
| 		for _, ecShardInfo := range diskInfo.EcShardInfos {
 | |
| 			if vid == ecShardInfo.Id {
 | |
| 				shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
 | |
| 				return shardBits.ShardIdCount()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return 0
 | |
| }
 | |
| 
 | |
| type EcRack struct {
 | |
| 	ecNodes    map[EcNodeId]*EcNode
 | |
| 	freeEcSlot int
 | |
| }
 | |
| 
 | |
| func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
 | |
| 
 | |
| 	// list all possible locations
 | |
| 	// collect topology information
 | |
| 	topologyInfo, _, err := collectTopologyInfo(commandEnv)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// find out all volume servers with one slot left.
 | |
| 	ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
 | |
| 
 | |
| 	sortEcNodesByFreeslotsDecending(ecNodes)
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
 | |
| 	eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
 | |
| 		if selectedDataCenter != "" && selectedDataCenter != dc {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
 | |
| 		ecNodes = append(ecNodes, &EcNode{
 | |
| 			info:       dn,
 | |
| 			dc:         dc,
 | |
| 			rack:       rack,
 | |
| 			freeEcSlot: int(freeEcSlots),
 | |
| 		})
 | |
| 		totalFreeEcSlots += freeEcSlots
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
 | |
| 
 | |
| 	fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
 | |
| 
 | |
| 	return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
 | |
| 		_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
 | |
| 			VolumeId:   uint32(volumeId),
 | |
| 			Collection: collection,
 | |
| 			ShardIds:   toBeDeletedShardIds,
 | |
| 		})
 | |
| 		return deleteErr
 | |
| 	})
 | |
| 
 | |
| }
 | |
| 
 | |
| func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
 | |
| 
 | |
| 	fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
 | |
| 
 | |
| 	return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
 | |
| 		_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
 | |
| 			VolumeId: uint32(volumeId),
 | |
| 			ShardIds: toBeUnmountedhardIds,
 | |
| 		})
 | |
| 		return deleteErr
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
 | |
| 
 | |
| 	fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
 | |
| 
 | |
| 	return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
 | |
| 		_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
 | |
| 			VolumeId:   uint32(volumeId),
 | |
| 			Collection: collection,
 | |
| 			ShardIds:   toBeMountedhardIds,
 | |
| 		})
 | |
| 		return mountErr
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func divide(total, n int) float64 {
 | |
| 	return float64(total) / float64(n)
 | |
| }
 | |
| 
 | |
| func ceilDivide(total, n int) int {
 | |
| 	return int(math.Ceil(float64(total) / float64(n)))
 | |
| }
 | |
| 
 | |
| func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
 | |
| 
 | |
| 	if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
 | |
| 		for _, shardInfo := range diskInfo.EcShardInfos {
 | |
| 			if needle.VolumeId(shardInfo.Id) == vid {
 | |
| 				return erasure_coding.ShardBits(shardInfo.EcIndexBits)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return 0
 | |
| }
 | |
| 
 | |
| func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
 | |
| 
 | |
| 	foundVolume := false
 | |
| 	diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
 | |
| 	if found {
 | |
| 		for _, shardInfo := range diskInfo.EcShardInfos {
 | |
| 			if needle.VolumeId(shardInfo.Id) == vid {
 | |
| 				oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
 | |
| 				newShardBits := oldShardBits
 | |
| 				for _, shardId := range shardIds {
 | |
| 					newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
 | |
| 				}
 | |
| 				shardInfo.EcIndexBits = uint32(newShardBits)
 | |
| 				ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
 | |
| 				foundVolume = true
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		diskInfo = &master_pb.DiskInfo{
 | |
| 			Type: string(types.HardDriveType),
 | |
| 		}
 | |
| 		ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
 | |
| 	}
 | |
| 
 | |
| 	if !foundVolume {
 | |
| 		var newShardBits erasure_coding.ShardBits
 | |
| 		for _, shardId := range shardIds {
 | |
| 			newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
 | |
| 		}
 | |
| 		diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
 | |
| 			Id:          uint32(vid),
 | |
| 			Collection:  collection,
 | |
| 			EcIndexBits: uint32(newShardBits),
 | |
| 			DiskType:    string(types.HardDriveType),
 | |
| 		})
 | |
| 		ecNode.freeEcSlot -= len(shardIds)
 | |
| 	}
 | |
| 
 | |
| 	return ecNode
 | |
| }
 | |
| 
 | |
| func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
 | |
| 
 | |
| 	if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
 | |
| 		for _, shardInfo := range diskInfo.EcShardInfos {
 | |
| 			if needle.VolumeId(shardInfo.Id) == vid {
 | |
| 				oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
 | |
| 				newShardBits := oldShardBits
 | |
| 				for _, shardId := range shardIds {
 | |
| 					newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
 | |
| 				}
 | |
| 				shardInfo.EcIndexBits = uint32(newShardBits)
 | |
| 				ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return ecNode
 | |
| }
 | |
| 
 | |
| func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
 | |
| 	countMap := make(map[string]int)
 | |
| 	for _, d := range data {
 | |
| 		id, count := identifierFn(d)
 | |
| 		countMap[id] += count
 | |
| 	}
 | |
| 	return countMap
 | |
| }
 | |
| 
 | |
| func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
 | |
| 	groupMap := make(map[string][]*EcNode)
 | |
| 	for _, d := range data {
 | |
| 		id := identifierFn(d)
 | |
| 		groupMap[id] = append(groupMap[id], d)
 | |
| 	}
 | |
| 	return groupMap
 | |
| }
 | 
