ActivateEcGeneration RPC implemented

This commit is contained in:
chrislu
2025-08-10 13:11:47 -07:00
parent 870b2ffe07
commit 50cc17e8fa
2 changed files with 98 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/topology"
@@ -291,6 +292,77 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
return resp, nil
}
func (ms *MasterServer) ActivateEcGeneration(ctx context.Context, req *master_pb.ActivateEcGenerationRequest) (*master_pb.ActivateEcGenerationResponse, error) {
if !ms.Topo.IsLeader() {
return &master_pb.ActivateEcGenerationResponse{
Success: false,
Error: "not leader",
}, nil
}
// Basic request validation
if req.VolumeId == 0 {
return &master_pb.ActivateEcGenerationResponse{
Success: false,
Error: "invalid volume ID: cannot be 0",
}, nil
}
volumeId := needle.VolumeId(req.VolumeId)
targetGeneration := req.Generation
glog.V(1).Infof("ActivateEcGeneration: Activating generation %d for EC volume %d in collection %s",
targetGeneration, req.VolumeId, req.Collection)
// Validate that the target generation exists and has sufficient shards
ready, availableShards, err := ms.Topo.ValidateEcGenerationReadiness(volumeId, targetGeneration)
if err != nil {
errMsg := err.Error()
glog.Warningf("ActivateEcGeneration: %s", errMsg)
return &master_pb.ActivateEcGenerationResponse{
Success: false,
Error: errMsg,
}, nil
}
if !ready {
errMsg := fmt.Sprintf("generation %d for EC volume %d not ready: has %d shards, needs %d",
targetGeneration, req.VolumeId, availableShards, erasure_coding.DataShardsCount)
glog.Warningf("ActivateEcGeneration: %s", errMsg)
return &master_pb.ActivateEcGenerationResponse{
Success: false,
Error: errMsg,
}, nil
}
glog.V(2).Infof("ActivateEcGeneration: Generation %d for volume %d is ready with %d available shards",
targetGeneration, req.VolumeId, availableShards)
// Check current active generation for logging
var currentActiveGeneration uint32
if current, exists := ms.Topo.GetEcActiveGeneration(volumeId); exists {
currentActiveGeneration = current
if current == targetGeneration {
glog.V(2).Infof("ActivateEcGeneration: Generation %d is already active for volume %d", targetGeneration, req.VolumeId)
return &master_pb.ActivateEcGenerationResponse{
Success: true,
Error: "",
}, nil
}
}
// Perform the atomic activation
ms.Topo.SetEcActiveGeneration(volumeId, targetGeneration)
glog.V(0).Infof("ActivateEcGeneration: Successfully activated generation %d for EC volume %d (was: %d)",
targetGeneration, req.VolumeId, currentActiveGeneration)
return &master_pb.ActivateEcGenerationResponse{
Success: true,
Error: "",
}, nil
}
func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
if !ms.Topo.IsLeader() {

View File

@@ -338,3 +338,29 @@ func (t *Topology) LookupEcShardsWithFallback(vid needle.VolumeId, requestedGene
return nil, 0, false
}
// ValidateEcGenerationReadiness checks if an EC generation has sufficient shards for activation
// Returns true if the generation has at least erasure_coding.DataShardsCount shards available
func (t *Topology) ValidateEcGenerationReadiness(vid needle.VolumeId, generation uint32) (ready bool, availableShards int, err error) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation}
ecLocations, found := t.ecShardMap[key]
if !found {
return false, 0, fmt.Errorf("generation %d not found for EC volume %d", generation, vid)
}
// Count available shards
availableShards = 0
for _, locations := range ecLocations.Locations {
if len(locations) > 0 {
availableShards++
}
}
// Need at least DataShardsCount shards to reconstruct data
ready = availableShards >= erasure_coding.DataShardsCount
return ready, availableShards, nil
}