mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 08:46:54 +08:00
Finds the most complete/reliable generation
This commit is contained in:
@@ -45,6 +45,7 @@ type VacuumPlan struct {
|
||||
}
|
||||
|
||||
// DetermineGenerationsFromParams extracts generation information from task parameters
|
||||
// Now supports multiple generations and finds the most complete one for vacuum
|
||||
func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.TaskParams) (sourceGen, targetGen uint32, err error) {
|
||||
if params == nil {
|
||||
return 0, 0, fmt.Errorf("task parameters cannot be nil")
|
||||
@@ -55,29 +56,27 @@ func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.Tas
|
||||
return 0, 1, nil
|
||||
}
|
||||
|
||||
// Use generation from first source (all sources should have same generation)
|
||||
if params.Sources[0].Generation > 0 {
|
||||
sourceGen = params.Sources[0].Generation
|
||||
targetGen = sourceGen + 1
|
||||
} else {
|
||||
// Generation 0 case
|
||||
sourceGen = 0
|
||||
targetGen = 1
|
||||
// Group sources by generation and analyze completeness
|
||||
generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("failed to analyze generation completeness: %w", err)
|
||||
}
|
||||
|
||||
// Validate consistency - all sources should have the same generation
|
||||
for i, source := range params.Sources {
|
||||
if source.Generation != sourceGen {
|
||||
return 0, 0, fmt.Errorf("inconsistent generations in sources: source[0]=%d, source[%d]=%d",
|
||||
sourceGen, i, source.Generation)
|
||||
}
|
||||
// Find the most complete generation that can be used for reconstruction
|
||||
mostCompleteGen, found := logic.FindMostCompleteGeneration(generationAnalysis)
|
||||
if !found {
|
||||
return 0, 0, fmt.Errorf("no generation has sufficient shards for reconstruction")
|
||||
}
|
||||
|
||||
return sourceGen, targetGen, nil
|
||||
// Target generation is max(all generations) + 1
|
||||
maxGen := logic.FindMaxGeneration(generationAnalysis)
|
||||
targetGen = maxGen + 1
|
||||
|
||||
return mostCompleteGen, targetGen, nil
|
||||
}
|
||||
|
||||
// ParseSourceNodes extracts source node information from task parameters
|
||||
func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[pb.ServerAddress]erasure_coding.ShardBits, error) {
|
||||
// ParseSourceNodes extracts source node information from task parameters for a specific generation
|
||||
func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams, targetGeneration uint32) (map[pb.ServerAddress]erasure_coding.ShardBits, error) {
|
||||
if params == nil {
|
||||
return nil, fmt.Errorf("task parameters cannot be nil")
|
||||
}
|
||||
@@ -85,7 +84,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
|
||||
sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits)
|
||||
|
||||
for _, source := range params.Sources {
|
||||
if source.Node == "" {
|
||||
if source.Node == "" || source.Generation != targetGeneration {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -105,7 +104,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
|
||||
}
|
||||
|
||||
if len(sourceNodes) == 0 {
|
||||
return nil, fmt.Errorf("no valid source nodes found: sources=%d", len(params.Sources))
|
||||
return nil, fmt.Errorf("no valid source nodes found for generation %d: sources=%d", targetGeneration, len(params.Sources))
|
||||
}
|
||||
|
||||
return sourceNodes, nil
|
||||
@@ -113,14 +112,14 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
|
||||
|
||||
// CreateVacuumPlan creates a comprehensive plan for the EC vacuum operation
|
||||
func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, params *worker_pb.TaskParams) (*VacuumPlan, error) {
|
||||
// Extract generations
|
||||
// Extract generations and analyze completeness
|
||||
sourceGen, targetGen, err := logic.DetermineGenerationsFromParams(params)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to determine generations: %w", err)
|
||||
}
|
||||
|
||||
// Parse source nodes
|
||||
sourceNodes, err := logic.ParseSourceNodes(params)
|
||||
// Parse source nodes from the selected generation
|
||||
sourceNodes, err := logic.ParseSourceNodes(params, sourceGen)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse source nodes: %w", err)
|
||||
}
|
||||
@@ -137,8 +136,18 @@ func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string,
|
||||
Nodes: sourceNodes, // Same nodes, new generation
|
||||
}
|
||||
|
||||
// Determine what to cleanup (simplified: just source generation)
|
||||
generationsToCleanup := []uint32{sourceGen}
|
||||
// Get all available generations for cleanup calculation
|
||||
generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to analyze generations for cleanup: %w", err)
|
||||
}
|
||||
|
||||
// All generations except target should be cleaned up
|
||||
var allGenerations []uint32
|
||||
for generation := range generationAnalysis {
|
||||
allGenerations = append(allGenerations, generation)
|
||||
}
|
||||
generationsToCleanup := logic.CalculateCleanupGenerations(sourceGen, targetGen, allGenerations)
|
||||
|
||||
// Generate safety checks
|
||||
safetyChecks := logic.generateSafetyChecks(sourceDistribution, targetGen)
|
||||
@@ -243,6 +252,100 @@ type CleanupImpact struct {
|
||||
ShardsToDelete int
|
||||
}
|
||||
|
||||
// GenerationAnalysis represents the analysis of shard completeness per generation
|
||||
type GenerationAnalysis struct {
|
||||
Generation uint32
|
||||
ShardBits erasure_coding.ShardBits
|
||||
ShardCount int
|
||||
Nodes map[pb.ServerAddress]erasure_coding.ShardBits
|
||||
CanReconstruct bool // Whether this generation has enough shards for reconstruction
|
||||
}
|
||||
|
||||
// AnalyzeGenerationCompleteness analyzes each generation's shard completeness
|
||||
func (logic *EcVacuumLogic) AnalyzeGenerationCompleteness(params *worker_pb.TaskParams) (map[uint32]*GenerationAnalysis, error) {
|
||||
if params == nil {
|
||||
return nil, fmt.Errorf("task parameters cannot be nil")
|
||||
}
|
||||
|
||||
generationMap := make(map[uint32]*GenerationAnalysis)
|
||||
|
||||
// Group sources by generation
|
||||
for _, source := range params.Sources {
|
||||
if source.Node == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
generation := source.Generation
|
||||
if _, exists := generationMap[generation]; !exists {
|
||||
generationMap[generation] = &GenerationAnalysis{
|
||||
Generation: generation,
|
||||
ShardBits: erasure_coding.ShardBits(0),
|
||||
Nodes: make(map[pb.ServerAddress]erasure_coding.ShardBits),
|
||||
}
|
||||
}
|
||||
|
||||
analysis := generationMap[generation]
|
||||
serverAddr := pb.ServerAddress(source.Node)
|
||||
var shardBits erasure_coding.ShardBits
|
||||
|
||||
// Convert shard IDs to ShardBits
|
||||
for _, shardId := range source.ShardIds {
|
||||
if shardId < erasure_coding.TotalShardsCount {
|
||||
shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId))
|
||||
}
|
||||
}
|
||||
|
||||
if shardBits.ShardIdCount() > 0 {
|
||||
analysis.Nodes[serverAddr] = shardBits
|
||||
analysis.ShardBits = analysis.ShardBits.Plus(shardBits)
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate completeness for each generation
|
||||
for _, analysis := range generationMap {
|
||||
analysis.ShardCount = analysis.ShardBits.ShardIdCount()
|
||||
analysis.CanReconstruct = analysis.ShardCount >= erasure_coding.DataShardsCount
|
||||
}
|
||||
|
||||
return generationMap, nil
|
||||
}
|
||||
|
||||
// FindMostCompleteGeneration finds the generation with the most complete set of shards
|
||||
// that can be used for reconstruction
|
||||
func (logic *EcVacuumLogic) FindMostCompleteGeneration(generationMap map[uint32]*GenerationAnalysis) (uint32, bool) {
|
||||
var bestGeneration uint32
|
||||
var bestShardCount int
|
||||
found := false
|
||||
|
||||
for generation, analysis := range generationMap {
|
||||
// Only consider generations that can reconstruct
|
||||
if !analysis.CanReconstruct {
|
||||
continue
|
||||
}
|
||||
|
||||
// Prefer the generation with the most shards, or if tied, the highest generation number
|
||||
if !found || analysis.ShardCount > bestShardCount ||
|
||||
(analysis.ShardCount == bestShardCount && generation > bestGeneration) {
|
||||
bestGeneration = generation
|
||||
bestShardCount = analysis.ShardCount
|
||||
found = true
|
||||
}
|
||||
}
|
||||
|
||||
return bestGeneration, found
|
||||
}
|
||||
|
||||
// FindMaxGeneration finds the highest generation number among all available generations
|
||||
func (logic *EcVacuumLogic) FindMaxGeneration(generationMap map[uint32]*GenerationAnalysis) uint32 {
|
||||
var maxGen uint32
|
||||
for generation := range generationMap {
|
||||
if generation > maxGen {
|
||||
maxGen = generation
|
||||
}
|
||||
}
|
||||
return maxGen
|
||||
}
|
||||
|
||||
// countShardsToDelete counts how many shard files will be deleted
|
||||
func (logic *EcVacuumLogic) countShardsToDelete(plan *VacuumPlan) int {
|
||||
totalShards := 0
|
||||
|
||||
@@ -35,7 +35,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
|
||||
name: "generation 0 source",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{Generation: 0},
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 0,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectSrc: 0,
|
||||
@@ -45,7 +49,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
|
||||
name: "generation 1 source",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{Generation: 1},
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectSrc: 1,
|
||||
@@ -55,29 +63,54 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
|
||||
name: "generation 5 source",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{Generation: 5},
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 5,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectSrc: 5,
|
||||
expectTgt: 6,
|
||||
},
|
||||
{
|
||||
name: "inconsistent generations",
|
||||
name: "multiple generations - finds most complete",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{Generation: 1},
|
||||
{Generation: 2}, // Different generation!
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: true,
|
||||
expectSrc: 2, // Should pick generation 2 (most complete)
|
||||
expectTgt: 3, // Target should be max(1,2) + 1 = 3
|
||||
},
|
||||
{
|
||||
name: "multiple sources same generation",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{Generation: 3},
|
||||
{Generation: 3},
|
||||
{Generation: 3},
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4},
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{5, 6, 7, 8, 9}, // Combined = 10 shards - sufficient
|
||||
},
|
||||
{
|
||||
Node: "node3:8080",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{10, 11, 12, 13},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectSrc: 3,
|
||||
@@ -118,6 +151,7 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
params *worker_pb.TaskParams
|
||||
generation uint32
|
||||
expectNodes int
|
||||
expectShards map[string][]int // node -> shard IDs
|
||||
expectError bool
|
||||
@@ -125,6 +159,7 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
{
|
||||
name: "nil params",
|
||||
params: nil,
|
||||
generation: 0,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
@@ -132,6 +167,7 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{},
|
||||
},
|
||||
generation: 0,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
@@ -139,11 +175,13 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5},
|
||||
Node: "node1:8080",
|
||||
Generation: 0,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 0,
|
||||
expectNodes: 1,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {0, 1, 2, 3, 4, 5},
|
||||
@@ -154,19 +192,23 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4},
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4},
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
ShardIds: []uint32{5, 6, 7, 8, 9},
|
||||
Node: "node2:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
Node: "node3:8080",
|
||||
ShardIds: []uint32{10, 11, 12, 13},
|
||||
Node: "node3:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{10, 11, 12, 13},
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 1,
|
||||
expectNodes: 3,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {0, 1, 2, 3, 4},
|
||||
@@ -179,15 +221,18 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
ShardIds: []uint32{0, 1, 2},
|
||||
Node: "node1:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1, 2},
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes
|
||||
Node: "node2:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 2,
|
||||
expectNodes: 2,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {0, 1, 2},
|
||||
@@ -199,15 +244,18 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "",
|
||||
ShardIds: []uint32{0, 1, 2},
|
||||
Node: "",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{0, 1, 2},
|
||||
},
|
||||
{
|
||||
Node: "node1:8080",
|
||||
ShardIds: []uint32{3, 4, 5},
|
||||
Node: "node1:8080",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{3, 4, 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 3,
|
||||
expectNodes: 1,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {3, 4, 5},
|
||||
@@ -218,21 +266,51 @@ func TestParseSourceNodes(t *testing.T) {
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid
|
||||
Node: "node1:8080",
|
||||
Generation: 4,
|
||||
ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 4,
|
||||
expectNodes: 1,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {0, 1}, // Only valid shards
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "filter by generation - only matching generation",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2},
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 2, // Different generation - should be ignored
|
||||
ShardIds: []uint32{3, 4, 5},
|
||||
},
|
||||
{
|
||||
Node: "node3:8080",
|
||||
Generation: 1, // Same generation - should be included
|
||||
ShardIds: []uint32{6, 7, 8},
|
||||
},
|
||||
},
|
||||
},
|
||||
generation: 1,
|
||||
expectNodes: 2,
|
||||
expectShards: map[string][]int{
|
||||
"node1:8080": {0, 1, 2},
|
||||
"node3:8080": {6, 7, 8},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sourceNodes, err := logic.ParseSourceNodes(tt.params)
|
||||
sourceNodes, err := logic.ParseSourceNodes(tt.params, tt.generation)
|
||||
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
@@ -685,3 +763,354 @@ func createRealisticTopologyTest(t *testing.T) {
|
||||
func TestRealisticTopologyScenarios(t *testing.T) {
|
||||
t.Run("3-node distributed shards", createRealisticTopologyTest)
|
||||
}
|
||||
|
||||
func TestAnalyzeGenerationCompleteness(t *testing.T) {
|
||||
logic := NewEcVacuumLogic()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
params *worker_pb.TaskParams
|
||||
expectedGenerations []uint32
|
||||
expectedCanReconstruct map[uint32]bool
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "nil params",
|
||||
params: nil,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "single generation sufficient shards",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards = sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedGenerations: []uint32{1},
|
||||
expectedCanReconstruct: map[uint32]bool{1: true},
|
||||
},
|
||||
{
|
||||
name: "single generation insufficient shards",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2}, // Only 3 shards = insufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedGenerations: []uint32{1},
|
||||
expectedCanReconstruct: map[uint32]bool{1: false},
|
||||
},
|
||||
{
|
||||
name: "multiple generations mixed completeness",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2}, // 3 shards - insufficient
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
|
||||
},
|
||||
{
|
||||
Node: "node3:8080",
|
||||
Generation: 3,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5}, // 6 shards - insufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedGenerations: []uint32{1, 2, 3},
|
||||
expectedCanReconstruct: map[uint32]bool{1: false, 2: true, 3: false},
|
||||
},
|
||||
{
|
||||
name: "multiple nodes same generation",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4},
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{5, 6, 7, 8, 9}, // Together = 10 shards = sufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedGenerations: []uint32{1},
|
||||
expectedCanReconstruct: map[uint32]bool{1: true},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
analysis, err := logic.AnalyzeGenerationCompleteness(tt.params)
|
||||
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error but got none")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Check we have the expected generations
|
||||
if len(analysis) != len(tt.expectedGenerations) {
|
||||
t.Errorf("generation count: expected %d, got %d", len(tt.expectedGenerations), len(analysis))
|
||||
return
|
||||
}
|
||||
|
||||
for _, expectedGen := range tt.expectedGenerations {
|
||||
genAnalysis, exists := analysis[expectedGen]
|
||||
if !exists {
|
||||
t.Errorf("expected generation %d not found", expectedGen)
|
||||
continue
|
||||
}
|
||||
|
||||
expectedCanReconstruct := tt.expectedCanReconstruct[expectedGen]
|
||||
if genAnalysis.CanReconstruct != expectedCanReconstruct {
|
||||
t.Errorf("generation %d CanReconstruct: expected %v, got %v",
|
||||
expectedGen, expectedCanReconstruct, genAnalysis.CanReconstruct)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindMostCompleteGeneration(t *testing.T) {
|
||||
logic := NewEcVacuumLogic()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
generationAnalysis map[uint32]*GenerationAnalysis
|
||||
expectedGeneration uint32
|
||||
expectedFound bool
|
||||
}{
|
||||
{
|
||||
name: "empty analysis",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{},
|
||||
expectedFound: false,
|
||||
},
|
||||
{
|
||||
name: "single reconstructable generation",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
|
||||
},
|
||||
expectedGeneration: 1,
|
||||
expectedFound: true,
|
||||
},
|
||||
{
|
||||
name: "no reconstructable generations",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
1: {Generation: 1, ShardCount: 5, CanReconstruct: false},
|
||||
2: {Generation: 2, ShardCount: 3, CanReconstruct: false},
|
||||
},
|
||||
expectedFound: false,
|
||||
},
|
||||
{
|
||||
name: "multiple reconstructable - picks most complete",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
|
||||
2: {Generation: 2, ShardCount: 14, CanReconstruct: true}, // Most complete
|
||||
3: {Generation: 3, ShardCount: 12, CanReconstruct: true},
|
||||
},
|
||||
expectedGeneration: 2,
|
||||
expectedFound: true,
|
||||
},
|
||||
{
|
||||
name: "tie in shard count - picks higher generation",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
|
||||
2: {Generation: 2, ShardCount: 10, CanReconstruct: true}, // Same count, higher generation
|
||||
},
|
||||
expectedGeneration: 2,
|
||||
expectedFound: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
generation, found := logic.FindMostCompleteGeneration(tt.generationAnalysis)
|
||||
|
||||
if found != tt.expectedFound {
|
||||
t.Errorf("found: expected %v, got %v", tt.expectedFound, found)
|
||||
return
|
||||
}
|
||||
|
||||
if tt.expectedFound && generation != tt.expectedGeneration {
|
||||
t.Errorf("generation: expected %d, got %d", tt.expectedGeneration, generation)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindMaxGeneration(t *testing.T) {
|
||||
logic := NewEcVacuumLogic()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
generationAnalysis map[uint32]*GenerationAnalysis
|
||||
expectedMax uint32
|
||||
}{
|
||||
{
|
||||
name: "empty analysis",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{},
|
||||
expectedMax: 0,
|
||||
},
|
||||
{
|
||||
name: "single generation",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
5: {Generation: 5},
|
||||
},
|
||||
expectedMax: 5,
|
||||
},
|
||||
{
|
||||
name: "multiple generations",
|
||||
generationAnalysis: map[uint32]*GenerationAnalysis{
|
||||
1: {Generation: 1},
|
||||
5: {Generation: 5},
|
||||
3: {Generation: 3},
|
||||
7: {Generation: 7}, // Highest
|
||||
},
|
||||
expectedMax: 7,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
maxGen := logic.FindMaxGeneration(tt.generationAnalysis)
|
||||
|
||||
if maxGen != tt.expectedMax {
|
||||
t.Errorf("max generation: expected %d, got %d", tt.expectedMax, maxGen)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiGenerationVacuumScenarios(t *testing.T) {
|
||||
logic := NewEcVacuumLogic()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
params *worker_pb.TaskParams
|
||||
expectedSourceGen uint32
|
||||
expectedTargetGen uint32
|
||||
expectedCleanupCount int
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "corrupted generation 1, good generation 2",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2}, // Insufficient - corrupted data
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // Complete - good data
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedSourceGen: 2, // Should use generation 2
|
||||
expectedTargetGen: 3, // max(1,2) + 1 = 3
|
||||
expectedCleanupCount: 2, // Clean up generations 1 and 2
|
||||
},
|
||||
{
|
||||
name: "multiple old generations, one current good",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 0,
|
||||
ShardIds: []uint32{0, 1}, // Old incomplete
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2, 3}, // Old incomplete
|
||||
},
|
||||
{
|
||||
Node: "node3:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, // Complete - all shards
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedSourceGen: 2, // Should use generation 2 (most complete)
|
||||
expectedTargetGen: 3, // max(0,1,2) + 1 = 3
|
||||
expectedCleanupCount: 3, // Clean up generations 0, 1, and 2
|
||||
},
|
||||
{
|
||||
name: "no sufficient generations",
|
||||
params: &worker_pb.TaskParams{
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: "node1:8080",
|
||||
Generation: 1,
|
||||
ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient
|
||||
},
|
||||
{
|
||||
Node: "node2:8080",
|
||||
Generation: 2,
|
||||
ShardIds: []uint32{0, 1}, // Only 2 shards - insufficient
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: true, // No generation has enough shards
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
plan, err := logic.CreateVacuumPlan(123, "test", tt.params)
|
||||
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error but got none")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if plan.CurrentGeneration != tt.expectedSourceGen {
|
||||
t.Errorf("source generation: expected %d, got %d", tt.expectedSourceGen, plan.CurrentGeneration)
|
||||
}
|
||||
|
||||
if plan.TargetGeneration != tt.expectedTargetGen {
|
||||
t.Errorf("target generation: expected %d, got %d", tt.expectedTargetGen, plan.TargetGeneration)
|
||||
}
|
||||
|
||||
if len(plan.GenerationsToCleanup) != tt.expectedCleanupCount {
|
||||
t.Errorf("cleanup count: expected %d, got %d", tt.expectedCleanupCount, len(plan.GenerationsToCleanup))
|
||||
}
|
||||
|
||||
// Verify cleanup generations don't include target
|
||||
for _, gen := range plan.GenerationsToCleanup {
|
||||
if gen == plan.TargetGeneration {
|
||||
t.Errorf("cleanup generations should not include target generation %d", plan.TargetGeneration)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user