mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-08 04:26:21 +08:00
use ec shard size
This commit is contained in:
@@ -61,15 +61,16 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
|
||||
// Check quiet duration and fullness criteria
|
||||
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
|
||||
expectedShardSize := calculateExpectedShardSize(metric.Size)
|
||||
result := &types.TaskDetectionResult{
|
||||
TaskType: types.TaskTypeErasureCoding,
|
||||
VolumeID: metric.VolumeID,
|
||||
Server: metric.Server,
|
||||
Collection: metric.Collection,
|
||||
Priority: types.TaskPriorityLow, // EC is not urgent
|
||||
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
|
||||
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB), expected shard size=%.1fMB",
|
||||
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
|
||||
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
|
||||
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB, float64(expectedShardSize)/(1024*1024)),
|
||||
ScheduleAt: now,
|
||||
}
|
||||
|
||||
@@ -97,8 +98,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
},
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
|
||||
metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
|
||||
glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs, expected shard size: %.2fMB",
|
||||
metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs, float64(expectedShardSize)/(1024*1024))
|
||||
} else {
|
||||
glog.Warningf("No ActiveTopology available for destination planning in EC detection")
|
||||
continue // Skip this volume if no topology available
|
||||
@@ -143,6 +144,10 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
// planECDestinations plans the destinations for erasure coding operation
|
||||
// This function implements EC destination planning logic directly in the detection phase
|
||||
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
|
||||
// Calculate expected shard size based on original volume size
|
||||
// EC data is split across data shards, so each shard gets roughly volume_size / data_shards
|
||||
expectedShardSize := calculateExpectedShardSize(metric.Size)
|
||||
|
||||
// Get source node information from topology
|
||||
var sourceRack, sourceDC string
|
||||
|
||||
@@ -178,9 +183,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
|
||||
}
|
||||
|
||||
// Select best disks for EC placement with rack/DC diversity
|
||||
selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
|
||||
if len(selectedDisks) < minTotalDisks {
|
||||
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks)
|
||||
selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount, expectedShardSize)
|
||||
if len(selectedDisks) < erasure_coding.MinTotalDisks {
|
||||
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
|
||||
}
|
||||
|
||||
var plans []*topology.DestinationPlan
|
||||
@@ -193,8 +198,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
|
||||
TargetDisk: disk.DiskID,
|
||||
TargetRack: disk.Rack,
|
||||
TargetDC: disk.DataCenter,
|
||||
ExpectedSize: 0, // EC shards don't have predetermined size
|
||||
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
|
||||
ExpectedSize: expectedShardSize, // Use calculated shard size
|
||||
PlacementScore: calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize),
|
||||
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
@@ -205,6 +210,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
|
||||
dcCount[disk.DataCenter]++
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs",
|
||||
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount))
|
||||
|
||||
return &topology.MultiDestinationPlan{
|
||||
Plans: plans,
|
||||
TotalShards: len(plans),
|
||||
@@ -243,7 +251,7 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
|
||||
}
|
||||
|
||||
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
|
||||
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
|
||||
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int, expectedShardSize uint64) []*topology.DiskInfo {
|
||||
if len(disks) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -265,7 +273,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s
|
||||
}
|
||||
|
||||
// Select best disk from this rack
|
||||
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
|
||||
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC, expectedShardSize)
|
||||
if bestDisk != nil {
|
||||
selected = append(selected, bestDisk)
|
||||
usedRacks[rackKey] = true
|
||||
@@ -298,7 +306,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s
|
||||
}
|
||||
|
||||
// selectBestFromRack selects the best disk from a rack for EC placement
|
||||
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
|
||||
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) *topology.DiskInfo {
|
||||
if len(disks) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -311,7 +319,7 @@ func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string)
|
||||
continue
|
||||
}
|
||||
|
||||
score := calculateECScore(disk, sourceRack, sourceDC)
|
||||
score := calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize)
|
||||
if score > bestScore {
|
||||
bestScore = score
|
||||
bestDisk = disk
|
||||
@@ -351,6 +359,24 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
|
||||
return score
|
||||
}
|
||||
|
||||
// calculateECScoreWithSize calculates placement score for EC operations with shard size consideration
|
||||
func calculateECScoreWithSize(disk *topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) float64 {
|
||||
baseScore := calculateECScore(disk, sourceRack, sourceDC)
|
||||
|
||||
// Additional scoring based on available space vs expected shard size
|
||||
if disk.DiskInfo != nil && expectedShardSize > 0 {
|
||||
// Estimate available space (this is a rough estimate)
|
||||
availableSlots := disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount
|
||||
if availableSlots > 0 {
|
||||
// Bonus for having plenty of space for the expected shard
|
||||
// This is a heuristic - each volume slot can theoretically hold any size
|
||||
baseScore += float64(availableSlots) * 2.0 // Up to 2 points per available slot
|
||||
}
|
||||
}
|
||||
|
||||
return baseScore
|
||||
}
|
||||
|
||||
// isDiskSuitableForEC checks if a disk is suitable for EC placement
|
||||
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
|
||||
if disk.DiskInfo == nil {
|
||||
@@ -414,3 +440,23 @@ func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32
|
||||
|
||||
return replicaServers
|
||||
}
|
||||
|
||||
// calculateExpectedShardSize calculates the expected size of each EC shard based on the original volume size
|
||||
func calculateExpectedShardSize(originalVolumeSize uint64) uint64 {
|
||||
if originalVolumeSize == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// In erasure coding, the original data is split across data shards
|
||||
// Each data shard gets approximately originalSize / dataShards
|
||||
// Parity shards are similar in size to data shards
|
||||
// Add some overhead for padding and metadata (typically ~5-10%)
|
||||
baseShardSize := originalVolumeSize / uint64(erasure_coding.DataShardsCount)
|
||||
overhead := baseShardSize / 10 // 10% overhead for padding and metadata
|
||||
expectedShardSize := baseShardSize + overhead
|
||||
|
||||
glog.V(2).Infof("Calculated expected shard size: original=%d bytes, base_shard=%d bytes, with_overhead=%d bytes",
|
||||
originalVolumeSize, baseShardSize, expectedShardSize)
|
||||
|
||||
return expectedShardSize
|
||||
}
|
||||
|
Reference in New Issue
Block a user