mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
Add activation step to EC vacuum: call ActivateEcGeneration when G+1 ready
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
@@ -27,6 +28,7 @@ type EcVacuumTask struct {
|
||||
targetGeneration uint32 // generation to create (G+1)
|
||||
tempDir string
|
||||
grpcDialOption grpc.DialOption
|
||||
masterAddress pb.ServerAddress // master server address for activation RPC
|
||||
}
|
||||
|
||||
// NewEcVacuumTask creates a new EC vacuum task instance
|
||||
@@ -78,7 +80,12 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams
|
||||
return fmt.Errorf("failed to distribute new EC shards: %w", err)
|
||||
}
|
||||
|
||||
// Step 6: Clean up old EC shards
|
||||
// Step 6: Activate new generation (atomic switch from G to G+1)
|
||||
if err := t.activateNewGeneration(); err != nil {
|
||||
return fmt.Errorf("failed to activate new generation: %w", err)
|
||||
}
|
||||
|
||||
// Step 7: Clean up old EC shards
|
||||
if err := t.cleanupOldEcShards(); err != nil {
|
||||
t.LogWarning("Failed to clean up old EC shards", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
@@ -291,6 +298,42 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// activateNewGeneration atomically switches the master to use the new generation
|
||||
func (t *EcVacuumTask) activateNewGeneration() error {
|
||||
t.LogInfo("Activating new generation", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"source_generation": t.sourceGeneration,
|
||||
"target_generation": t.targetGeneration,
|
||||
"master_address": t.masterAddress,
|
||||
})
|
||||
|
||||
if t.masterAddress == "" {
|
||||
t.LogWarning("Master address not set - skipping automatic generation activation", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"target_generation": t.targetGeneration,
|
||||
"note": "Generation activation must be done manually via master API",
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.ActivateEcGeneration(context.Background(), &master_pb.ActivateEcGenerationRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
Generation: t.targetGeneration,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to activate generation %d for volume %d: %w", t.targetGeneration, t.volumeID, err)
|
||||
}
|
||||
|
||||
t.LogInfo("Successfully activated new generation", map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"active_generation": t.targetGeneration,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// cleanupOldEcShards removes the original volume after successful vacuum
|
||||
func (t *EcVacuumTask) cleanupOldEcShards() error {
|
||||
t.LogInfo("Cleaning up original volume", map[string]interface{}{
|
||||
@@ -353,3 +396,8 @@ func (t *EcVacuumTask) GetCollection() string {
|
||||
func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) {
|
||||
t.grpcDialOption = option
|
||||
}
|
||||
|
||||
// SetMasterAddress sets the master server address for generation activation
|
||||
func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) {
|
||||
t.masterAddress = address
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user