use the EcVacuumLogic

This commit is contained in:
chrislu
2025-08-12 09:48:17 -07:00
parent 425a54019b
commit 8213a4a2b6

View File

@@ -40,6 +40,10 @@ type EcVacuumTask struct {
// Runtime-determined during execution // Runtime-determined during execution
sourceGeneration uint32 // generation to vacuum from (determined at runtime) sourceGeneration uint32 // generation to vacuum from (determined at runtime)
targetGeneration uint32 // generation to create (determined at runtime) targetGeneration uint32 // generation to create (determined at runtime)
// Core business logic
logic *EcVacuumLogic
plan *VacuumPlan // Generated plan for this vacuum operation
} }
// NewEcVacuumTask creates a new EC vacuum task instance // NewEcVacuumTask creates a new EC vacuum task instance
@@ -50,6 +54,7 @@ func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes
collection: collection, collection: collection,
sourceNodes: sourceNodes, sourceNodes: sourceNodes,
cleanupGracePeriod: 1 * time.Minute, // 1 minute grace period for faster cleanup cleanupGracePeriod: 1 * time.Minute, // 1 minute grace period for faster cleanup
logic: NewEcVacuumLogic(), // Initialize business logic
// sourceGeneration and targetGeneration will be determined during execution // sourceGeneration and targetGeneration will be determined during execution
} }
} }
@@ -66,21 +71,39 @@ func (t *EcVacuumTask) GetTopologyTaskID() string {
// Execute performs the EC vacuum operation // Execute performs the EC vacuum operation
func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
// Initialize generations from TaskSource (determined during detection phase) // Step 0: Create comprehensive vacuum plan using the logic layer
if len(params.Sources) > 0 && params.Sources[0].Generation > 0 { plan, err := t.logic.CreateVacuumPlan(t.volumeID, t.collection, params)
t.sourceGeneration = params.Sources[0].Generation if err != nil {
t.targetGeneration = t.sourceGeneration + 1 return fmt.Errorf("failed to create vacuum plan: %w", err)
} else { }
// Fallback to safe defaults for backward compatibility t.plan = plan
t.sourceGeneration = 0
t.targetGeneration = 1 // Extract generations from the plan
t.sourceGeneration = plan.CurrentGeneration
t.targetGeneration = plan.TargetGeneration
t.LogInfo("Vacuum plan created successfully", map[string]interface{}{
"volume_id": plan.VolumeID,
"collection": plan.Collection,
"source_generation": plan.CurrentGeneration,
"target_generation": plan.TargetGeneration,
"cleanup_generations": plan.GenerationsToCleanup,
"nodes_involved": len(plan.SourceDistribution.Nodes),
"safety_checks": len(plan.SafetyChecks),
})
// Validate the plan is safe to execute
if err := t.logic.ValidateShardDistribution(plan.SourceDistribution); err != nil {
return fmt.Errorf("vacuum plan validation failed: %w", err)
} }
t.LogInfo("Generations determined from TaskSource", map[string]interface{}{ t.LogInfo("Plan validation successful", map[string]interface{}{
"source_generation": t.sourceGeneration, "safety_checks": plan.SafetyChecks,
"target_generation": t.targetGeneration,
}) })
// Ensure sourceNodes is consistent with the plan
t.sourceNodes = plan.SourceDistribution.Nodes
// Log task information // Log task information
logFields := map[string]interface{}{ logFields := map[string]interface{}{
"volume_id": t.volumeID, "volume_id": t.volumeID,
@@ -950,20 +973,20 @@ func (t *EcVacuumTask) cleanupOldEcShards() error {
return fmt.Errorf("safety checks failed: %w", err) return fmt.Errorf("safety checks failed: %w", err)
} }
// Step 3: Clean up old generations (simplified - clean up source generation only) // Step 3: Use cleanup generations from the vacuum plan
// Generation discovery is now handled during detection phase in EcVolumeInfo generationsToCleanup := t.plan.GenerationsToCleanup
generationsToCleanup := []uint32{t.sourceGeneration}
t.LogInfo("Identified generations for cleanup", map[string]interface{}{ t.LogInfo("Using cleanup generations from vacuum plan", map[string]interface{}{
"volume_id": t.volumeID, "volume_id": t.volumeID,
"target_generation": t.targetGeneration, "target_generation": t.targetGeneration,
"source_generation": t.sourceGeneration, "source_generation": t.sourceGeneration,
"generations_to_cleanup": generationsToCleanup, "generations_to_cleanup": generationsToCleanup,
"plan_validated": true,
}) })
// Step 4: Unmount and delete old generation shards from each node // Step 4: Unmount and delete old generation shards from each node
var cleanupErrors []string var cleanupErrors []string
for node := range t.sourceNodes { for node := range t.plan.SourceDistribution.Nodes {
for _, generation := range generationsToCleanup { for _, generation := range generationsToCleanup {
if err := t.cleanupGenerationFromNode(node, generation); err != nil { if err := t.cleanupGenerationFromNode(node, generation); err != nil {
cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s generation %d: %v", node, generation, err)) cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s generation %d: %v", node, generation, err))