diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 3b2c63cc7..79acebff1 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "sync" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -18,12 +19,14 @@ func init() { } type ecRebuilder struct { - // TODO: add ErrorWaitGroup for parallelization commandEnv *CommandEnv ecNodes []*EcNode writer io.Writer applyChanges bool collections []string + + ewg *ErrorWaitGroup + ecNodesMu sync.Mutex } type commandEcRebuild struct { @@ -36,7 +39,14 @@ func (c *commandEcRebuild) Name() string { func (c *commandEcRebuild) Help() string { return `find and rebuild missing ec shards among volume servers - ec.rebuild [-c EACH_COLLECTION|] [-apply] + ec.rebuild [-c EACH_COLLECTION|] [-apply] [-maxParallelization N] + + Options: + -collection: specify a collection name, or "EACH_COLLECTION" to process all collections + -apply: actually perform the rebuild operations (default is dry-run mode) + -maxParallelization: number of volumes to rebuild concurrently (default: 10) + Increase for faster rebuilds with more system resources. + Decrease if experiencing resource contention or instability. Algorithm: @@ -71,13 +81,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyChanges := fixCommand.Bool("apply", false, "apply the changes") // TODO: remove this alias applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)") if err = fixCommand.Parse(args); err != nil { return nil } - handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges) infoAboutSimulationMode(writer, *applyChanges, "-apply") @@ -107,17 +117,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W writer: writer, applyChanges: *applyChanges, collections: collections, + + ewg: NewErrorWaitGroup(*maxParallelization), } fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections)) for _, c := range collections { - fmt.Printf("rebuildEcVolumes collection %s\n", c) - if err = erb.rebuildEcVolumes(c); err != nil { - return err - } + erb.rebuildEcVolumes(c) } - return nil + return erb.ewg.Wait() } func (erb *ecRebuilder) write(format string, a ...any) { @@ -128,30 +137,87 @@ func (erb *ecRebuilder) isLocked() bool { return erb.commandEnv.isLocked() } -// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder. -func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode { +// countLocalShards returns the number of shards already present locally on the node for the given volume. +func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int { + for _, diskInfo := range node.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return len(shardBits.ShardIds()) + } + } + } + return 0 +} + +// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots +// and reserves slots only for the non-local shards that need to be copied/generated. +func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + if len(erb.ecNodes) == 0 { - return nil + return nil, 0, fmt.Errorf("no ec nodes available") } - res := erb.ecNodes[0] - for i := 1; i < len(erb.ecNodes); i++ { - if erb.ecNodes[i].freeEcSlot > res.freeEcSlot { - res = erb.ecNodes[i] + // Find the node with the most free slots, considering local shards + var bestNode *EcNode + var bestSlotsNeeded int + var maxAvailableSlots int + var minSlotsNeeded int = erasure_coding.TotalShardsCount // Start with maximum possible + for _, node := range erb.ecNodes { + localShards := erb.countLocalShards(node, collection, volumeId) + slotsNeeded := erasure_coding.TotalShardsCount - localShards + if slotsNeeded < 0 { + slotsNeeded = 0 + } + + if node.freeEcSlot > maxAvailableSlots { + maxAvailableSlots = node.freeEcSlot + } + + if slotsNeeded < minSlotsNeeded { + minSlotsNeeded = slotsNeeded + } + + if node.freeEcSlot >= slotsNeeded { + if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot { + bestNode = node + bestSlotsNeeded = slotsNeeded + } } } - return res + if bestNode == nil { + return nil, 0, fmt.Errorf("no node has sufficient free slots for volume %d (need at least %d slots, max available: %d)", + volumeId, minSlotsNeeded, maxAvailableSlots) + } + + // Reserve slots only for non-local shards + bestNode.freeEcSlot -= bestSlotsNeeded + + return bestNode, bestSlotsNeeded, nil } -func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { - fmt.Printf("rebuildEcVolumes %s\n", collection) +// releaseRebuilder releases the reserved slots back to the rebuilder node. +func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + + // Release slots by incrementing the free slot count + node.freeEcSlot += slotsToRelease +} + +func (erb *ecRebuilder) rebuildEcVolumes(collection string) { + fmt.Printf("rebuildEcVolumes for %q\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go ecShardMap := make(EcShardMap) + erb.ecNodesMu.Lock() for _, ecNode := range erb.ecNodes { ecShardMap.registerEcNode(ecNode, collection) } + erb.ecNodesMu.Unlock() for vid, locations := range ecShardMap { shardCount := locations.shardCount() @@ -159,31 +225,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { continue } if shardCount < erasure_coding.DataShardsCount { - return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) + // Capture variables for closure + vid := vid + shardCount := shardCount + erb.ewg.Add(func() error { + return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) + }) + continue } - if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil { - return err - } + // Capture variables for closure + vid := vid + locations := locations + + erb.ewg.Add(func() error { + // Select rebuilder and reserve slots atomically per volume + rebuilder, slotsToReserve, err := erb.selectAndReserveRebuilder(collection, vid) + if err != nil { + return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err) + } + defer erb.releaseRebuilder(rebuilder, slotsToReserve) + + return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) + }) } - - return nil } -func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error { +func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error { if !erb.isLocked() { return fmt.Errorf("lock is lost") } - // TODO: fix this logic so it supports concurrent executions - rebuilder := erb.ecNodeWithMoreFreeSlots() - if rebuilder == nil { - return fmt.Errorf("no EC nodes available for rebuild") - } - if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount { - return fmt.Errorf("disk space is not enough") - } - fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk @@ -219,6 +291,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo return err } + // ensure ECNode updates are atomic + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) return nil diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index 5ab431137..b732e210a 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -79,69 +79,6 @@ func TestEcShardMapShardCount(t *testing.T) { } } -// TestEcRebuilderEcNodeWithMoreFreeSlots tests the free slot selection -func TestEcRebuilderEcNodeWithMoreFreeSlots(t *testing.T) { - testCases := []struct { - name string - nodes []*EcNode - expectedNode string - }{ - { - name: "single node", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 100), - }, - expectedNode: "node1", - }, - { - name: "multiple nodes - select highest", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 50), - newEcNode("dc1", "rack1", "node2", 150), - newEcNode("dc1", "rack1", "node3", 100), - }, - expectedNode: "node2", - }, - { - name: "multiple nodes - same slots", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 100), - newEcNode("dc1", "rack1", "node2", 100), - }, - expectedNode: "node1", // Should return first one - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - erb := &ecRebuilder{ - ecNodes: tc.nodes, - } - - node := erb.ecNodeWithMoreFreeSlots() - if node == nil { - t.Fatal("Expected a node, got nil") - } - - if node.info.Id != tc.expectedNode { - t.Errorf("Expected node %s, got %s", tc.expectedNode, node.info.Id) - } - }) - } -} - -// TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty tests empty node list -func TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty(t *testing.T) { - erb := &ecRebuilder{ - ecNodes: []*EcNode{}, - } - - node := erb.ecNodeWithMoreFreeSlots() - if node != nil { - t.Errorf("Expected nil for empty node list, got %v", node) - } -} - // TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes func TestRebuildEcVolumesInsufficientShards(t *testing.T) { var logBuffer bytes.Buffer @@ -155,15 +92,17 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err == nil { t.Fatal("Expected error for insufficient shards, got nil") } - if !strings.Contains(err.Error(), "unrepairable") { t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error()) } @@ -182,12 +121,15 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, applyChanges: false, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err != nil { t.Fatalf("Expected no error for complete volume, got: %v", err) } @@ -201,7 +143,9 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) { var logBuffer bytes.Buffer // Create a volume with missing shards but insufficient free slots - node1 := newEcNode("dc1", "rack1", "node1", 5). // Only 5 free slots, need 14 + // Node has 10 local shards, missing 4 shards (10,11,12,13), so needs 4 free slots + // Set free slots to 3 (insufficient) + node1 := newEcNode("dc1", "rack1", "node1", 3). // Only 3 free slots, need 4 addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) erb := &ecRebuilder{ @@ -209,18 +153,24 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, applyChanges: false, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err == nil { t.Fatal("Expected error for insufficient disk space, got nil") } - - if !strings.Contains(err.Error(), "disk space is not enough") { - t.Errorf("Expected 'disk space' in error message, got: %s", err.Error()) + if !strings.Contains(err.Error(), "no node has sufficient free slots") { + t.Errorf("Expected 'no node has sufficient free slots' in error message, got: %s", err.Error()) + } + // Verify the enhanced error message includes diagnostic information + if !strings.Contains(err.Error(), "need") || !strings.Contains(err.Error(), "max available") { + t.Errorf("Expected diagnostic information in error message, got: %s", err.Error()) } }