mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
detecting ec volumes
This commit is contained in:
@@ -1,32 +1,37 @@
|
||||
{
|
||||
"file_paths": [
|
||||
"/ec_test/large_file_1754854356_0.dat",
|
||||
"/ec_test/large_file_1754854356_1.dat",
|
||||
"/ec_test/large_file_1754854356_2.dat",
|
||||
"/ec_test/large_file_1754854356_3.dat",
|
||||
"/ec_test/large_file_1754854356_4.dat",
|
||||
"/ec_test/large_file_1754854356_5.dat",
|
||||
"/ec_test/large_file_1754854356_6.dat",
|
||||
"/ec_test/large_file_1754854356_7.dat",
|
||||
"/ec_test/large_file_1754854356_8.dat",
|
||||
"/ec_test/large_file_1754854356_9.dat",
|
||||
"/ec_test/large_file_1754854356_10.dat",
|
||||
"/ec_test/large_file_1754854356_11.dat",
|
||||
"/ec_test/large_file_1754854356_12.dat",
|
||||
"/ec_test/large_file_1754854356_13.dat",
|
||||
"/ec_test/large_file_1754854356_14.dat",
|
||||
"/ec_test/large_file_1754854356_15.dat",
|
||||
"/ec_test/large_file_1754854356_16.dat",
|
||||
"/ec_test/large_file_1754854356_17.dat",
|
||||
"/ec_test/large_file_1754854357_18.dat",
|
||||
"/ec_test/large_file_1754854357_19.dat",
|
||||
"/ec_test/large_file_1754854357_20.dat",
|
||||
"/ec_test/large_file_1754854357_21.dat",
|
||||
"/ec_test/large_file_1754854357_22.dat",
|
||||
"/ec_test/large_file_1754854357_23.dat",
|
||||
"/ec_test/large_file_1754854357_24.dat"
|
||||
"/ec_test/large_file_1754862003_0.dat",
|
||||
"/ec_test/large_file_1754862003_1.dat",
|
||||
"/ec_test/large_file_1754862003_2.dat",
|
||||
"/ec_test/large_file_1754862003_3.dat",
|
||||
"/ec_test/large_file_1754862003_4.dat",
|
||||
"/ec_test/large_file_1754862003_5.dat",
|
||||
"/ec_test/large_file_1754862003_6.dat",
|
||||
"/ec_test/large_file_1754862003_7.dat",
|
||||
"/ec_test/large_file_1754862003_8.dat",
|
||||
"/ec_test/large_file_1754862003_9.dat",
|
||||
"/ec_test/large_file_1754862003_10.dat",
|
||||
"/ec_test/large_file_1754862003_11.dat",
|
||||
"/ec_test/large_file_1754862003_12.dat",
|
||||
"/ec_test/large_file_1754862003_13.dat",
|
||||
"/ec_test/large_file_1754862003_14.dat",
|
||||
"/ec_test/large_file_1754862003_15.dat",
|
||||
"/ec_test/large_file_1754862003_16.dat",
|
||||
"/ec_test/large_file_1754862003_17.dat",
|
||||
"/ec_test/large_file_1754862003_18.dat",
|
||||
"/ec_test/large_file_1754862003_19.dat",
|
||||
"/ec_test/large_file_1754862003_20.dat",
|
||||
"/ec_test/large_file_1754862003_21.dat",
|
||||
"/ec_test/large_file_1754862003_22.dat",
|
||||
"/ec_test/large_file_1754862003_23.dat",
|
||||
"/ec_test/large_file_1754862003_24.dat",
|
||||
"/ec_test/large_file_1754862003_25.dat",
|
||||
"/ec_test/large_file_1754862003_26.dat",
|
||||
"/ec_test/large_file_1754862003_27.dat",
|
||||
"/ec_test/large_file_1754862003_28.dat",
|
||||
"/ec_test/large_file_1754862004_29.dat"
|
||||
],
|
||||
"timestamp": "2025-08-10T19:32:37.127812169Z",
|
||||
"file_count": 25,
|
||||
"timestamp": "2025-08-10T21:40:04.027796168Z",
|
||||
"file_count": 30,
|
||||
"file_size_kb": 3000
|
||||
}
|
||||
@@ -178,7 +178,7 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// enrichVolumeMetrics adds additional information like replica counts
|
||||
// enrichVolumeMetrics adds additional information like replica counts and EC volume identification
|
||||
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
|
||||
// Group volumes by ID to count replicas
|
||||
volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
|
||||
@@ -195,8 +195,51 @@ func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics
|
||||
glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount)
|
||||
}
|
||||
|
||||
// TODO: Identify EC volumes by checking volume structure
|
||||
// This would require querying volume servers for EC shard information
|
||||
// Identify EC volumes by checking EC shard information from topology
|
||||
ecVolumeSet := ms.getECVolumeSet()
|
||||
for _, metric := range metrics {
|
||||
if ecVolumeSet[metric.VolumeID] {
|
||||
metric.IsECVolume = true
|
||||
glog.V(2).Infof("Volume %d identified as EC volume", metric.VolumeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getECVolumeSet retrieves the set of volume IDs that exist as EC volumes in the cluster
|
||||
func (ms *MaintenanceScanner) getECVolumeSet() map[uint32]bool {
|
||||
ecVolumeSet := make(map[uint32]bool)
|
||||
|
||||
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.TopologyInfo != nil {
|
||||
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
for _, diskInfo := range node.DiskInfos {
|
||||
// Check EC shards on this disk
|
||||
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
||||
ecVolumeSet[ecShardInfo.Id] = true
|
||||
glog.V(3).Infof("Found EC volume %d on %s", ecShardInfo.Id, node.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get EC volume information from master: %v", err)
|
||||
return ecVolumeSet // Return empty set on error
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Found %d EC volumes in cluster topology", len(ecVolumeSet))
|
||||
return ecVolumeSet
|
||||
}
|
||||
|
||||
// convertToTaskMetrics converts existing volume metrics to task system format
|
||||
|
||||
@@ -34,9 +34,9 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// Collect EC volume information from topology
|
||||
ecVolumeInfo := collectEcVolumeInfo(info.ActiveTopology)
|
||||
glog.V(2).Infof("EC vacuum detection: found %d EC volumes in topology", len(ecVolumeInfo))
|
||||
// Collect EC volume information from metrics
|
||||
ecVolumeInfo := collectEcVolumeInfo(metrics)
|
||||
glog.V(2).Infof("EC vacuum detection: found %d EC volumes in metrics", len(ecVolumeInfo))
|
||||
|
||||
for volumeID, ecInfo := range ecVolumeInfo {
|
||||
// Apply filters
|
||||
@@ -104,15 +104,43 @@ type DeletionInfo struct {
|
||||
DeletionRatio float64
|
||||
}
|
||||
|
||||
// collectEcVolumeInfo extracts EC volume information from active topology
|
||||
func collectEcVolumeInfo(activeTopology interface{}) map[uint32]*EcVolumeInfo {
|
||||
// collectEcVolumeInfo extracts EC volume information from volume health metrics
|
||||
func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics) map[uint32]*EcVolumeInfo {
|
||||
ecVolumes := make(map[uint32]*EcVolumeInfo)
|
||||
|
||||
// Simplified implementation for demonstration
|
||||
// In production, this would query the topology for actual EC volume information
|
||||
// For now, return empty map since we don't have direct access to topology data
|
||||
glog.V(3).Infof("EC vacuum detection: topology analysis not implemented, returning empty volume list")
|
||||
for _, metric := range metrics {
|
||||
// Only process EC volumes
|
||||
if !metric.IsECVolume {
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculate deletion ratio from health metrics
|
||||
deletionRatio := 0.0
|
||||
if metric.Size > 0 {
|
||||
deletionRatio = float64(metric.DeletedBytes) / float64(metric.Size)
|
||||
}
|
||||
|
||||
// Create EC volume info from metrics
|
||||
ecVolumes[metric.VolumeID] = &EcVolumeInfo{
|
||||
VolumeID: metric.VolumeID,
|
||||
Collection: metric.Collection,
|
||||
Size: metric.Size,
|
||||
CreatedAt: time.Now().Add(-metric.Age),
|
||||
Age: metric.Age,
|
||||
PrimaryNode: metric.Server,
|
||||
ShardNodes: make(map[pb.ServerAddress]erasure_coding.ShardBits), // Will be populated if needed
|
||||
DeletionInfo: DeletionInfo{
|
||||
TotalEntries: int64(metric.Size / 1024), // Rough estimate
|
||||
DeletedEntries: int64(metric.DeletedBytes / 1024),
|
||||
DeletionRatio: deletionRatio,
|
||||
},
|
||||
}
|
||||
|
||||
glog.V(2).Infof("EC vacuum detection: found EC volume %d, size=%dMB, deleted=%dMB, ratio=%.1f%%",
|
||||
metric.VolumeID, metric.Size/(1024*1024), metric.DeletedBytes/(1024*1024), deletionRatio*100)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("EC vacuum detection: found %d EC volumes from %d metrics", len(ecVolumes), len(metrics))
|
||||
return ecVolumes
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user