diff --git a/weed/topology/capacity_reservation_test.go b/weed/topology/capacity_reservation_test.go new file mode 100644 index 000000000..38cb14c50 --- /dev/null +++ b/weed/topology/capacity_reservation_test.go @@ -0,0 +1,215 @@ +package topology + +import ( + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestCapacityReservations_BasicOperations(t *testing.T) { + cr := newCapacityReservations() + diskType := types.HardDriveType + + // Test initial state + if count := cr.getReservedCount(diskType); count != 0 { + t.Errorf("Expected 0 reserved count initially, got %d", count) + } + + // Test add reservation + reservationId := cr.addReservation(diskType, 5) + if reservationId == "" { + t.Error("Expected non-empty reservation ID") + } + + if count := cr.getReservedCount(diskType); count != 5 { + t.Errorf("Expected 5 reserved count, got %d", count) + } + + // Test multiple reservations + cr.addReservation(diskType, 3) + if count := cr.getReservedCount(diskType); count != 8 { + t.Errorf("Expected 8 reserved count after second reservation, got %d", count) + } + + // Test remove reservation + success := cr.removeReservation(reservationId) + if !success { + t.Error("Expected successful removal of existing reservation") + } + + if count := cr.getReservedCount(diskType); count != 3 { + t.Errorf("Expected 3 reserved count after removal, got %d", count) + } + + // Test remove non-existent reservation + success = cr.removeReservation("non-existent-id") + if success { + t.Error("Expected failure when removing non-existent reservation") + } +} + +func TestCapacityReservations_ExpiredCleaning(t *testing.T) { + cr := newCapacityReservations() + diskType := types.HardDriveType + + // Add reservations and manipulate their creation time + reservationId1 := cr.addReservation(diskType, 3) + reservationId2 := cr.addReservation(diskType, 2) + + // Make one reservation "old" + cr.Lock() + if reservation, exists := cr.reservations[reservationId1]; exists { + reservation.createdAt = time.Now().Add(-10 * time.Minute) // 10 minutes ago + } + cr.Unlock() + + // Clean expired reservations (5 minute expiration) + cr.cleanExpiredReservations(5 * time.Minute) + + // Only the non-expired reservation should remain + if count := cr.getReservedCount(diskType); count != 2 { + t.Errorf("Expected 2 reserved count after cleaning, got %d", count) + } + + // Verify the right reservation was kept + if !cr.removeReservation(reservationId2) { + t.Error("Expected recent reservation to still exist") + } + + if cr.removeReservation(reservationId1) { + t.Error("Expected old reservation to be cleaned up") + } +} + +func TestCapacityReservations_DifferentDiskTypes(t *testing.T) { + cr := newCapacityReservations() + + // Add reservations for different disk types + cr.addReservation(types.HardDriveType, 5) + cr.addReservation(types.SsdType, 3) + + // Check counts are separate + if count := cr.getReservedCount(types.HardDriveType); count != 5 { + t.Errorf("Expected 5 HDD reserved count, got %d", count) + } + + if count := cr.getReservedCount(types.SsdType); count != 3 { + t.Errorf("Expected 3 SSD reserved count, got %d", count) + } +} + +func TestNodeImpl_ReservationMethods(t *testing.T) { + // Create a test data node + dn := NewDataNode("test-node") + diskType := types.HardDriveType + + // Set up some capacity + diskUsage := dn.diskUsages.getOrCreateDisk(diskType) + diskUsage.maxVolumeCount = 10 + diskUsage.volumeCount = 5 // 5 volumes free initially + + option := &VolumeGrowOption{DiskType: diskType} + + // Test available space calculation + available := dn.AvailableSpaceFor(option) + if available != 5 { + t.Errorf("Expected 5 available slots, got %d", available) + } + + availableForReservation := dn.AvailableSpaceForReservation(option) + if availableForReservation != 5 { + t.Errorf("Expected 5 available slots for reservation, got %d", availableForReservation) + } + + // Test successful reservation + reservationId, success := dn.TryReserveCapacity(diskType, 3) + if !success { + t.Error("Expected successful reservation") + } + if reservationId == "" { + t.Error("Expected non-empty reservation ID") + } + + // Available space should be reduced by reservations + availableForReservation = dn.AvailableSpaceForReservation(option) + if availableForReservation != 2 { + t.Errorf("Expected 2 available slots after reservation, got %d", availableForReservation) + } + + // Base available space should remain unchanged + available = dn.AvailableSpaceFor(option) + if available != 5 { + t.Errorf("Expected base available to remain 5, got %d", available) + } + + // Test reservation failure when insufficient capacity + _, success = dn.TryReserveCapacity(diskType, 3) + if success { + t.Error("Expected reservation failure due to insufficient capacity") + } + + // Test release reservation + dn.ReleaseReservedCapacity(reservationId) + availableForReservation = dn.AvailableSpaceForReservation(option) + if availableForReservation != 5 { + t.Errorf("Expected 5 available slots after release, got %d", availableForReservation) + } +} + +func TestNodeImpl_ConcurrentReservations(t *testing.T) { + dn := NewDataNode("test-node") + diskType := types.HardDriveType + + // Set up capacity + diskUsage := dn.diskUsages.getOrCreateDisk(diskType) + diskUsage.maxVolumeCount = 10 + diskUsage.volumeCount = 0 // 10 volumes free initially + + // Test concurrent reservations using goroutines + var wg sync.WaitGroup + var reservationIds sync.Map + concurrentRequests := 10 + wg.Add(concurrentRequests) + + for i := 0; i < concurrentRequests; i++ { + go func(i int) { + defer wg.Done() + if reservationId, success := dn.TryReserveCapacity(diskType, 1); success { + reservationIds.Store(reservationId, true) + t.Logf("goroutine %d: Successfully reserved %s", i, reservationId) + } else { + t.Errorf("goroutine %d: Expected successful reservation", i) + } + }(i) + } + + wg.Wait() + + // Should have no more capacity + option := &VolumeGrowOption{DiskType: diskType} + if available := dn.AvailableSpaceForReservation(option); available != 0 { + t.Errorf("Expected 0 available slots after all reservations, got %d", available) + // Debug: check total reserved + reservedCount := dn.capacityReservations.getReservedCount(diskType) + t.Logf("Debug: Total reserved count: %d", reservedCount) + } + + // Next reservation should fail + _, success := dn.TryReserveCapacity(diskType, 1) + if success { + t.Error("Expected reservation failure when at capacity") + } + + // Release all reservations + reservationIds.Range(func(key, value interface{}) bool { + dn.ReleaseReservedCapacity(key.(string)) + return true + }) + + // Should have full capacity back + if available := dn.AvailableSpaceForReservation(option); available != 10 { + t.Errorf("Expected 10 available slots after releasing all, got %d", available) + } +} diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index 03fe20c10..e036621b4 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -1,9 +1,10 @@ package topology import ( - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "slices" "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) type DataCenter struct { @@ -16,6 +17,7 @@ func NewDataCenter(id string) *DataCenter { dc.nodeType = "DataCenter" dc.diskUsages = newDiskUsages() dc.children = make(map[NodeId]Node) + dc.capacityReservations = newCapacityReservations() dc.NodeImpl.value = dc return dc } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 3103dc207..4f2dbe464 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -30,6 +30,7 @@ func NewDataNode(id string) *DataNode { dn.nodeType = "DataNode" dn.diskUsages = newDiskUsages() dn.children = make(map[NodeId]Node) + dn.capacityReservations = newCapacityReservations() dn.NodeImpl.value = dn return dn } diff --git a/weed/topology/node.go b/weed/topology/node.go index aa178b561..60e7427af 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,6 +2,7 @@ package topology import ( "errors" + "fmt" "math/rand/v2" "strings" "sync" @@ -16,15 +17,124 @@ import ( ) type NodeId string + +// CapacityReservation represents a temporary reservation of capacity +type CapacityReservation struct { + reservationId string + diskType types.DiskType + count int64 + createdAt time.Time +} + +// CapacityReservations manages capacity reservations for a node +type CapacityReservations struct { + sync.RWMutex + reservations map[string]*CapacityReservation + reservedCounts map[types.DiskType]int64 +} + +func newCapacityReservations() *CapacityReservations { + return &CapacityReservations{ + reservations: make(map[string]*CapacityReservation), + reservedCounts: make(map[types.DiskType]int64), + } +} + +func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string { + cr.Lock() + defer cr.Unlock() + + return cr.doAddReservation(diskType, count) +} + +func (cr *CapacityReservations) removeReservation(reservationId string) bool { + cr.Lock() + defer cr.Unlock() + + if reservation, exists := cr.reservations[reservationId]; exists { + delete(cr.reservations, reservationId) + cr.decrementCount(reservation.diskType, reservation.count) + return true + } + return false +} + +func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 { + cr.RLock() + defer cr.RUnlock() + + return cr.reservedCounts[diskType] +} + +// decrementCount is a helper to decrement reserved count and clean up zero entries +func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) { + cr.reservedCounts[diskType] -= count + // Clean up zero counts to prevent map growth + if cr.reservedCounts[diskType] <= 0 { + delete(cr.reservedCounts, diskType) + } +} + +// doAddReservation is a helper to add a reservation, assuming the lock is already held +func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string { + now := time.Now() + reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64()) + cr.reservations[reservationId] = &CapacityReservation{ + reservationId: reservationId, + diskType: diskType, + count: count, + createdAt: now, + } + cr.reservedCounts[diskType] += count + return reservationId +} + +// tryReserveAtomic atomically checks available space and reserves if possible +func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) { + cr.Lock() + defer cr.Unlock() + + // Check available space under lock + currentReserved := cr.reservedCounts[diskType] + availableSpace := availableSpaceFunc() - currentReserved + + if availableSpace >= count { + // Create and add reservation atomically + return cr.doAddReservation(diskType, count), true + } + + return "", false +} + +func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) { + cr.Lock() + defer cr.Unlock() + + now := time.Now() + for id, reservation := range cr.reservations { + if now.Sub(reservation.createdAt) > expirationDuration { + delete(cr.reservations, id) + cr.decrementCount(reservation.diskType, reservation.count) + glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id) + } + } +} + type Node interface { Id() NodeId String() string AvailableSpaceFor(option *VolumeGrowOption) int64 ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) + ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) UpAdjustMaxVolumeId(vid needle.VolumeId) GetDiskUsages() *DiskUsages + // Capacity reservation methods for avoiding race conditions + TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) + ReleaseReservedCapacity(reservationId string) + AvailableSpaceForReservation(option *VolumeGrowOption) int64 + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -52,6 +162,9 @@ type NodeImpl struct { //for rack, data center, topology nodeType string value interface{} + + // capacity reservations to prevent race conditions during volume creation + capacityReservations *CapacityReservations } func (n *NodeImpl) GetDiskUsages() *DiskUsages { @@ -164,6 +277,42 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { } return freeVolumeSlotCount } + +// AvailableSpaceForReservation returns available space considering existing reservations +func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 { + baseAvailable := n.AvailableSpaceFor(option) + reservedCount := n.capacityReservations.getReservedCount(option.DiskType) + return baseAvailable - reservedCount +} + +// TryReserveCapacity attempts to atomically reserve capacity for volume creation +func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) { + const reservationTimeout = 5 * time.Minute // TODO: make this configurable + + // Clean up any expired reservations first + n.capacityReservations.cleanExpiredReservations(reservationTimeout) + + // Atomically check and reserve space + option := &VolumeGrowOption{DiskType: diskType} + reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 { + return n.AvailableSpaceFor(option) + }) + + if success { + glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId) + } + + return reservationId, success +} + +// ReleaseReservedCapacity releases a previously reserved capacity +func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) { + if n.capacityReservations.removeReservation(reservationId) { + glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId) + } else { + glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId) + } +} func (n *NodeImpl) SetParent(node Node) { n.parent = node } @@ -186,10 +335,24 @@ func (n *NodeImpl) GetValue() interface{} { } func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { + return n.reserveOneVolumeInternal(r, option, false) +} + +// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks +func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { + return n.reserveOneVolumeInternal(r, option, true) +} + +func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.AvailableSpaceFor(option) + var freeSpace int64 + if useReservations { + freeSpace = node.AvailableSpaceForReservation(option) + } else { + freeSpace = node.AvailableSpaceFor(option) + } // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -197,7 +360,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { + var hasSpace bool + if useReservations { + hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0 + } else { + hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0 + } + if hasSpace { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) dn := node.(*DataNode) if dn.IsTerminating { @@ -205,7 +374,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned } return dn, nil } - assignedNode, err = node.ReserveOneVolume(r, option) + if useReservations { + assignedNode, err = node.ReserveOneVolumeForReservation(r, option) + } else { + assignedNode, err = node.ReserveOneVolume(r, option) + } if err == nil { return } diff --git a/weed/topology/race_condition_stress_test.go b/weed/topology/race_condition_stress_test.go new file mode 100644 index 000000000..a60f0a32a --- /dev/null +++ b/weed/topology/race_condition_stress_test.go @@ -0,0 +1,306 @@ +package topology + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// TestRaceConditionStress simulates the original issue scenario: +// High concurrent writes causing capacity misjudgment +func TestRaceConditionStress(t *testing.T) { + // Create a cluster similar to the issue description: + // 3 volume servers, 200GB each, 5GB volume limit = 40 volumes max per server + const ( + numServers = 3 + volumeLimitMB = 5000 // 5GB in MB + storagePerServerGB = 200 // 200GB per server + maxVolumesPerServer = storagePerServerGB * 1024 / volumeLimitMB // 200*1024/5000 = 40 + concurrentRequests = 50 // High concurrency like the issue + ) + + // Create test topology + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), uint64(volumeLimitMB)*1024*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Create 3 volume servers with realistic capacity + servers := make([]*DataNode, numServers) + for i := 0; i < numServers; i++ { + dn := NewDataNode(fmt.Sprintf("server%d", i+1)) + rack.LinkChildNode(dn) + + // Set up disk with capacity for 40 volumes + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = maxVolumesPerServer + dn.LinkChildNode(disk) + + servers[i] = dn + } + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") // Single replica like the issue + + option := &VolumeGrowOption{ + Collection: "test-bucket-large", // Same collection name as issue + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Track results + var successfulAllocations int64 + var failedAllocations int64 + var totalVolumesCreated int64 + + var wg sync.WaitGroup + + // Launch concurrent volume creation requests + startTime := time.Now() + for i := 0; i < concurrentRequests; i++ { + wg.Add(1) + go func(requestId int) { + defer wg.Done() + + // This is the critical test: multiple threads trying to allocate simultaneously + servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + + if err != nil { + atomic.AddInt64(&failedAllocations, 1) + t.Logf("Request %d failed: %v", requestId, err) + return + } + + // Simulate volume creation delay (like in real scenario) + time.Sleep(time.Millisecond * 50) + + // Simulate successful volume creation + for _, server := range servers { + disk := server.children[NodeId(types.HardDriveType.String())].(*Disk) + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + atomic.AddInt64(&totalVolumesCreated, 1) + } + + // Release reservations (simulates successful registration) + reservation.releaseAllReservations() + atomic.AddInt64(&successfulAllocations, 1) + + }(i) + } + + wg.Wait() + duration := time.Since(startTime) + + // Verify results + t.Logf("Test completed in %v", duration) + t.Logf("Successful allocations: %d", successfulAllocations) + t.Logf("Failed allocations: %d", failedAllocations) + t.Logf("Total volumes created: %d", totalVolumesCreated) + + // Check capacity limits are respected + totalCapacityUsed := int64(0) + for i, server := range servers { + disk := server.children[NodeId(types.HardDriveType.String())].(*Disk) + volumeCount := disk.diskUsages.getOrCreateDisk(types.HardDriveType).volumeCount + totalCapacityUsed += volumeCount + + t.Logf("Server %d: %d volumes (max: %d)", i+1, volumeCount, maxVolumesPerServer) + + // Critical test: No server should exceed its capacity + if volumeCount > maxVolumesPerServer { + t.Errorf("RACE CONDITION DETECTED: Server %d exceeded capacity: %d > %d", + i+1, volumeCount, maxVolumesPerServer) + } + } + + // Verify totals make sense + if totalVolumesCreated != totalCapacityUsed { + t.Errorf("Volume count mismatch: created=%d, actual=%d", totalVolumesCreated, totalCapacityUsed) + } + + // The total should never exceed the cluster capacity (120 volumes for 3 servers × 40 each) + maxClusterCapacity := int64(numServers * maxVolumesPerServer) + if totalCapacityUsed > maxClusterCapacity { + t.Errorf("RACE CONDITION DETECTED: Cluster capacity exceeded: %d > %d", + totalCapacityUsed, maxClusterCapacity) + } + + // With reservations, we should have controlled allocation + // Total requests = successful + failed should equal concurrentRequests + if successfulAllocations+failedAllocations != concurrentRequests { + t.Errorf("Request count mismatch: success=%d + failed=%d != total=%d", + successfulAllocations, failedAllocations, concurrentRequests) + } + + t.Logf("✅ Race condition test passed: Capacity limits respected with %d concurrent requests", + concurrentRequests) +} + +// TestCapacityJudgmentAccuracy verifies that the capacity calculation is accurate +// under various load conditions +func TestCapacityJudgmentAccuracy(t *testing.T) { + // Create a single server with known capacity + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 5*1024*1024*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + // Server with capacity for exactly 10 volumes + disk := NewDisk(types.HardDriveType.String()) + diskUsage := disk.diskUsages.getOrCreateDisk(types.HardDriveType) + diskUsage.maxVolumeCount = 10 + dn.LinkChildNode(disk) + + // Also set max volume count on the DataNode level (gets propagated up) + dn.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 10 + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Test accurate capacity reporting at each step + for i := 0; i < 10; i++ { + // Check available space before reservation + availableBefore := dn.AvailableSpaceFor(option) + availableForReservation := dn.AvailableSpaceForReservation(option) + + expectedAvailable := int64(10 - i) + if availableBefore != expectedAvailable { + t.Errorf("Step %d: Expected %d available, got %d", i, expectedAvailable, availableBefore) + } + + if availableForReservation != expectedAvailable { + t.Errorf("Step %d: Expected %d available for reservation, got %d", i, expectedAvailable, availableForReservation) + } + + // Try to reserve and allocate + _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err != nil { + t.Fatalf("Step %d: Unexpected reservation failure: %v", i, err) + } + + // Check that available space for reservation decreased + availableAfterReservation := dn.AvailableSpaceForReservation(option) + if availableAfterReservation != expectedAvailable-1 { + t.Errorf("Step %d: Expected %d available after reservation, got %d", + i, expectedAvailable-1, availableAfterReservation) + } + + // Simulate successful volume creation by properly updating disk usage hierarchy + disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) + + // Create a volume usage delta to simulate volume creation + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + + // Properly propagate the usage up the hierarchy + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + + // Debug: Check the volume count after update + diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType) + currentVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount) + t.Logf("Step %d: Volume count after update: %d", i, currentVolumeCount) + + // Release reservation + reservation.releaseAllReservations() + + // Verify final state + availableAfter := dn.AvailableSpaceFor(option) + expectedAfter := int64(10 - i - 1) + if availableAfter != expectedAfter { + t.Errorf("Step %d: Expected %d available after creation, got %d", + i, expectedAfter, availableAfter) + // More debugging + diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType) + maxVolumes := atomic.LoadInt64(&diskUsageOnNode.maxVolumeCount) + remoteVolumes := atomic.LoadInt64(&diskUsageOnNode.remoteVolumeCount) + actualVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount) + t.Logf("Debug Step %d: max=%d, volume=%d, remote=%d", i, maxVolumes, actualVolumeCount, remoteVolumes) + } + } + + // At this point, no more reservations should succeed + _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err == nil { + t.Error("Expected reservation to fail when at capacity") + } + + t.Logf("✅ Capacity judgment accuracy test passed") +} + +// TestReservationSystemPerformance measures the performance impact of reservations +func TestReservationSystemPerformance(t *testing.T) { + // Create topology + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 1000 + dn.LinkChildNode(disk) + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Benchmark reservation operations + const iterations = 1000 + + startTime := time.Now() + for i := 0; i < iterations; i++ { + _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err != nil { + t.Fatalf("Iteration %d failed: %v", i, err) + } + reservation.releaseAllReservations() + + // Simulate volume creation + diskUsage := dn.diskUsages.getOrCreateDisk(types.HardDriveType) + atomic.AddInt64(&diskUsage.volumeCount, 1) + } + duration := time.Since(startTime) + + avgDuration := duration / iterations + t.Logf("Performance: %d reservations in %v (avg: %v per reservation)", + iterations, duration, avgDuration) + + // Performance should be reasonable (less than 1ms per reservation on average) + if avgDuration > time.Millisecond { + t.Errorf("Reservation system performance concern: %v per reservation", avgDuration) + } else { + t.Logf("✅ Performance test passed: %v per reservation", avgDuration) + } +} diff --git a/weed/topology/rack.go b/weed/topology/rack.go index d82ef7986..f526cd84d 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -1,12 +1,13 @@ package topology import ( - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" "slices" "strings" "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) type Rack struct { @@ -19,6 +20,7 @@ func NewRack(id string) *Rack { r.nodeType = "Rack" r.diskUsages = newDiskUsages() r.children = make(map[NodeId]Node) + r.capacityReservations = newCapacityReservations() r.NodeImpl.value = r return r } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 750c00ea2..bbae97d72 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -67,6 +67,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.NodeImpl.value = t t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) + t.capacityReservations = newCapacityReservations() t.collectionMap = util.NewConcurrentReadMap() t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index c62fd72a0..f7af4e0a5 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -74,6 +74,22 @@ type VolumeGrowth struct { accessLock sync.Mutex } +// VolumeGrowReservation tracks capacity reservations for a volume creation operation +type VolumeGrowReservation struct { + servers []*DataNode + reservationIds []string + diskType types.DiskType +} + +// releaseAllReservations releases all reservations in this volume grow operation +func (vgr *VolumeGrowReservation) releaseAllReservations() { + for i, server := range vgr.servers { + if i < len(vgr.reservationIds) && vgr.reservationIds[i] != "" { + server.ReleaseReservedCapacity(vgr.reservationIds[i]) + } + } +} + func (o *VolumeGrowOption) String() string { blob, _ := json.Marshal(o) return string(blob) @@ -125,10 +141,17 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe } func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, option) + servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations if e != nil { return nil, e } + // Ensure reservations are released if anything goes wrong + defer func() { + if err != nil && reservation != nil { + reservation.releaseAllReservations() + } + }() + for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) { glog.V(0).Infof("wait for volume servers to join back") time.Sleep(constants.VolumePulseSeconds / 2) @@ -137,7 +160,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo if raftErr != nil { return nil, raftErr } - if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil { + if err = vg.grow(grpcDialOption, topo, vid, option, reservation, servers...); err == nil { for _, server := range servers { result = append(result, &master_pb.VolumeLocation{ Url: server.Url(), @@ -156,9 +179,37 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo // 2.2 collect all racks that have rp.SameRackCount+1 // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 // 2. find rest data nodes -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { +// If useReservations is true, reserves capacity on each server and returns reservation info +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement + + // Select appropriate functions based on useReservations flag + var availableSpaceFunc func(Node, *VolumeGrowOption) int64 + var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error) + + if useReservations { + availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { + return node.AvailableSpaceForReservation(option) + } + reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) { + return node.ReserveOneVolumeForReservation(r, option) + } + } else { + availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { + return node.AvailableSpaceFor(option) + } + reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) { + return node.ReserveOneVolume(r, option) + } + } + + // Ensure cleanup of partial reservations on error + defer func() { + if err != nil && reservation != nil { + reservation.releaseAllReservations() + } + }() mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) @@ -166,14 +217,14 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1) + if availableSpaceFunc(node, option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 for _, rack := range node.Children() { possibleDataNodesCount := 0 for _, n := range rack.Children() { - if n.AvailableSpaceFor(option) >= 1 { + if availableSpaceFunc(n, option) >= 1 { possibleDataNodesCount++ } } @@ -187,7 +238,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return nil }) if dc_err != nil { - return nil, dc_err + return nil, nil, dc_err } //find main rack and other racks @@ -195,8 +246,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1) + if availableSpaceFunc(node, option) < int64(rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { // a bit faster way to test free racks @@ -204,7 +255,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } possibleDataNodesCount := 0 for _, n := range node.Children() { - if n.AvailableSpaceFor(option) >= 1 { + if availableSpaceFunc(n, option) >= 1 { possibleDataNodesCount++ } } @@ -214,7 +265,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return nil }) if rackErr != nil { - return nil, rackErr + return nil, nil, rackErr } //find main server and other servers @@ -222,13 +273,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } - if node.AvailableSpaceFor(option) < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1) + if availableSpaceFunc(node, option) < 1 { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) } return nil }) if serverErr != nil { - return nil, serverErr + return nil, nil, serverErr } servers = append(servers, mainServer.(*DataNode)) @@ -236,25 +287,47 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Int64N(rack.AvailableSpaceFor(option)) - if server, e := rack.ReserveOneVolume(r, option); e == nil { + r := rand.Int64N(availableSpaceFunc(rack, option)) + if server, e := reserveOneVolumeFunc(rack, r, option); e == nil { servers = append(servers, server) } else { - return servers, e + return servers, nil, e } } for _, datacenter := range otherDataCenters { - r := rand.Int64N(datacenter.AvailableSpaceFor(option)) - if server, e := datacenter.ReserveOneVolume(r, option); e == nil { + r := rand.Int64N(availableSpaceFunc(datacenter, option)) + if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil { servers = append(servers, server) } else { - return servers, e + return servers, nil, e } } - return + + // If reservations are requested, try to reserve capacity on each server + if useReservations { + reservation = &VolumeGrowReservation{ + servers: servers, + reservationIds: make([]string, len(servers)), + diskType: option.DiskType, + } + + // Try to reserve capacity on each server + for i, server := range servers { + reservationId, success := server.TryReserveCapacity(option.DiskType, 1) + if !success { + return servers, nil, fmt.Errorf("failed to reserve capacity on server %s", server.Id()) + } + reservation.reservationIds[i] = reservationId + } + + glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers)) + } + + return servers, reservation, nil } -func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) (growErr error) { +// grow creates volumes on the provided servers, optionally managing capacity reservations +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, reservation *VolumeGrowReservation, servers ...*DataNode) (growErr error) { var createdVolumes []storage.VolumeInfo for _, server := range servers { if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { @@ -283,6 +356,10 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid topo.RegisterVolumeLayout(vi, server) glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String()) } + // Release reservations on success since volumes are now registered + if reservation != nil { + reservation.releaseAllReservations() + } } else { // cleaning up created volume replicas for i, vi := range createdVolumes { @@ -291,6 +368,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String()) } } + // Reservations will be released by the caller in case of failure } return growErr diff --git a/weed/topology/volume_growth_reservation_test.go b/weed/topology/volume_growth_reservation_test.go new file mode 100644 index 000000000..7b06e626d --- /dev/null +++ b/weed/topology/volume_growth_reservation_test.go @@ -0,0 +1,276 @@ +package topology + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// MockGrpcDialOption simulates grpc connection for testing +type MockGrpcDialOption struct{} + +// simulateVolumeAllocation mocks the volume allocation process +func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error { + // Simulate some processing time + time.Sleep(time.Millisecond * 10) + return nil +} + +func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) { + // Create test topology with single server for predictable behavior + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + // Create data center and rack + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Create single data node with limited capacity + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + // Set up disk with limited capacity (only 5 volumes) + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn.LinkChildNode(disk) + + // Test volume growth with reservation + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas) + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Try to create volumes and verify reservations work + for i := 0; i < 5; i++ { + servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err != nil { + t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err) + continue + } + + if len(servers) != 1 { + t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers)) + } + + if len(reservation.reservationIds) != 1 { + t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds)) + } + + // Verify the reservation is on our expected server + server := servers[0] + if server != dn { + t.Errorf("Expected volume to be allocated on server1, got %s", server.Id()) + } + + // Check available space before and after reservation + availableBeforeCreation := server.AvailableSpaceFor(option) + expectedBefore := int64(5 - i) + if availableBeforeCreation != expectedBefore { + t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation) + } + + // Simulate successful volume creation + disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + + // Release reservation after successful creation + reservation.releaseAllReservations() + + // Verify available space after creation + availableAfterCreation := server.AvailableSpaceFor(option) + expectedAfter := int64(5 - i - 1) + if availableAfterCreation != expectedAfter { + t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation) + } + } + + // After 5 volumes, should have no more capacity + _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err == nil { + t.Error("Expected volume allocation to fail when server is at capacity") + } +} + +func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) { + // Create test topology with very limited capacity + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Single data node with capacity for only 5 volumes + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn.LinkChildNode(disk) + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas) + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Simulate concurrent volume creation attempts + const concurrentRequests = 10 + var wg sync.WaitGroup + var successCount, failureCount atomic.Int32 + + for i := 0; i < concurrentRequests; i++ { + wg.Add(1) + go func(requestId int) { + defer wg.Done() + + _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + + if err != nil { + failureCount.Add(1) + t.Logf("Request %d failed as expected: %v", requestId, err) + } else { + successCount.Add(1) + t.Logf("Request %d succeeded, got reservation", requestId) + + // Release the reservation to simulate completion + if reservation != nil { + reservation.releaseAllReservations() + // Simulate volume creation by incrementing count + disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + } + } + }(i) + } + + wg.Wait() + + // With reservation system, only 5 requests should succeed (capacity limit) + // The rest should fail due to insufficient capacity + if successCount.Load() != 5 { + t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load()) + } + + if failureCount.Load() != 5 { + t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load()) + } + + // Verify final state + finalAvailable := dn.AvailableSpaceFor(option) + if finalAvailable != 0 { + t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable) + } + + t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load()) +} + +func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) { + // Create topology with multiple servers, but limited total capacity + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Create two servers with different available capacity + dn1 := NewDataNode("server1") + dn2 := NewDataNode("server2") + rack.LinkChildNode(dn1) + rack.LinkChildNode(dn2) + + // Server 1: 5 available slots + disk1 := NewDisk(types.HardDriveType.String()) + disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn1.LinkChildNode(disk1) + + // Server 2: 0 available slots (full) + disk2 := NewDisk(types.HardDriveType.String()) + diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType) + diskUsage2.maxVolumeCount = 5 + diskUsage2.volumeCount = 5 + dn2.LinkChildNode(disk2) + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // This should fail because we can't satisfy replica requirements + // (need 2 servers but only 1 has space) + _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err == nil { + t.Error("Expected reservation to fail due to insufficient replica capacity") + } + + // Verify no reservations are left hanging + available1 := dn1.AvailableSpaceForReservation(option) + if available1 != 5 { + t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1) + } + + available2 := dn2.AvailableSpaceForReservation(option) + if available2 != 0 { + t.Errorf("Expected server2 to have no capacity available, got %d", available2) + } +} + +func TestVolumeGrowth_ReservationTimeout(t *testing.T) { + dn := NewDataNode("server1") + diskType := types.HardDriveType + + // Set up capacity + diskUsage := dn.diskUsages.getOrCreateDisk(diskType) + diskUsage.maxVolumeCount = 5 + + // Create a reservation + reservationId, success := dn.TryReserveCapacity(diskType, 2) + if !success { + t.Fatal("Expected successful reservation") + } + + // Manually set the reservation time to simulate old reservation + dn.capacityReservations.Lock() + if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists { + reservation.createdAt = time.Now().Add(-10 * time.Minute) + } + dn.capacityReservations.Unlock() + + // Try another reservation - this should trigger cleanup and succeed + _, success = dn.TryReserveCapacity(diskType, 3) + if !success { + t.Error("Expected reservation to succeed after cleanup of expired reservation") + } + + // Original reservation should be cleaned up + option := &VolumeGrowOption{DiskType: diskType} + available := dn.AvailableSpaceForReservation(option) + if available != 2 { // 5 - 3 = 2 + t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available) + } +} diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 286289148..9bf3f3747 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -145,7 +145,7 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) { Rack: "", DataNode: "", } - servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false) if err != nil { fmt.Println("finding empty slots error :", err) t.Fail() @@ -267,7 +267,7 @@ func TestReplication011(t *testing.T) { Rack: "", DataNode: "", } - servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false) if err != nil { fmt.Println("finding empty slots error :", err) t.Fail() @@ -345,7 +345,7 @@ func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { distribution := map[NodeId]int{} // assign 1000 volumes for i := 0; i < 1000; i++ { - servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false) if err != nil { fmt.Println("finding empty slots error :", err) t.Fail()