diff --git a/weed/storage/store.go b/weed/storage/store.go index 7c41f1c35..cc07f8702 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -292,7 +292,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount) + effectiveMaxCount := location.MaxVolumeCount + if location.isDiskSpaceLow { + usedSlots := int32(location.LocalVolumesLen()) + ecShardCount := location.EcShardCount() + usedSlots += int32((ecShardCount + erasure_coding.DataShardsCount - 1) / erasure_coding.DataShardsCount) + effectiveMaxCount = usedSlots + } + if effectiveMaxCount < 0 { + effectiveMaxCount = 0 + } + maxVolumeCounts[string(location.DiskType)] += uint32(effectiveMaxCount) location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() diff --git a/weed/storage/store_disk_space_test.go b/weed/storage/store_disk_space_test.go index 284657e3c..884b8dda1 100644 --- a/weed/storage/store_disk_space_test.go +++ b/weed/storage/store_disk_space_test.go @@ -3,7 +3,9 @@ package storage import ( "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func TestHasFreeDiskLocation(t *testing.T) { @@ -92,3 +94,101 @@ func TestHasFreeDiskLocation(t *testing.T) { }) } } + +func newTestLocation(maxCount int32, isDiskLow bool, volCount int) *DiskLocation { + location := &DiskLocation{ + volumes: make(map[needle.VolumeId]*Volume), + ecVolumes: make(map[needle.VolumeId]*erasure_coding.EcVolume), + MaxVolumeCount: maxCount, + DiskType: types.ToDiskType("hdd"), + isDiskSpaceLow: isDiskLow, + } + for i := 1; i <= volCount; i++ { + location.volumes[needle.VolumeId(i)] = &Volume{} + } + return location +} + +func TestCollectHeartbeatRespectsLowDiskSpace(t *testing.T) { + diskType := types.ToDiskType("hdd") + + t.Run("low disk space", func(t *testing.T) { + location := newTestLocation(10, true, 3) + store := &Store{Locations: []*DiskLocation{location}} + + hb := store.CollectHeartbeat() + if got := hb.MaxVolumeCounts[string(diskType)]; got != 3 { + t.Errorf("expected low disk space to cap max volume count to used slots, got %d", got) + } + }) + + t.Run("normal disk space", func(t *testing.T) { + location := newTestLocation(10, false, 3) + store := &Store{Locations: []*DiskLocation{location}} + + hb := store.CollectHeartbeat() + if got := hb.MaxVolumeCounts[string(diskType)]; got != 10 { + t.Errorf("expected normal disk space to report configured max volume count, got %d", got) + } + }) + + t.Run("low disk space zero volumes", func(t *testing.T) { + location := newTestLocation(10, true, 0) + store := &Store{Locations: []*DiskLocation{location}} + + hb := store.CollectHeartbeat() + if got := hb.MaxVolumeCounts[string(diskType)]; got != 0 { + t.Errorf("expected zero volumes to report zero capacity, got %d", got) + } + }) + + t.Run("low disk space with ec shards", func(t *testing.T) { + location := newTestLocation(10, true, 3) + + ecVolume := &erasure_coding.EcVolume{VolumeId: 1} + const shardCount = 15 + for i := 0; i < shardCount; i++ { + ecVolume.Shards = append(ecVolume.Shards, &erasure_coding.EcVolumeShard{ + ShardId: erasure_coding.ShardId(i), + }) + } + location.ecVolumes[ecVolume.VolumeId] = ecVolume + store := &Store{Locations: []*DiskLocation{location}} + + hb := store.CollectHeartbeat() + expectedSlots := len(location.volumes) + (shardCount+erasure_coding.DataShardsCount-1)/erasure_coding.DataShardsCount + if got := hb.MaxVolumeCounts[string(diskType)]; got != uint32(expectedSlots) { + t.Errorf("expected low disk space to include ec shard contribution, got %d want %d", got, expectedSlots) + } + }) + + t.Run("low disk space with multiple ec volumes", func(t *testing.T) { + location := newTestLocation(10, true, 2) + + totalShardCount := 0 + + addEcVolume := func(vid needle.VolumeId, shardCount int) { + ecVolume := &erasure_coding.EcVolume{VolumeId: vid} + for i := 0; i < shardCount; i++ { + ecVolume.Shards = append(ecVolume.Shards, &erasure_coding.EcVolumeShard{ + ShardId: erasure_coding.ShardId(i), + }) + } + location.ecVolumes[vid] = ecVolume + totalShardCount += shardCount + } + + addEcVolume(1, 12) + addEcVolume(2, 6) + + store := &Store{Locations: []*DiskLocation{location}} + + hb := store.CollectHeartbeat() + expectedSlots := len(location.volumes) + expectedSlots += (totalShardCount + erasure_coding.DataShardsCount - 1) / erasure_coding.DataShardsCount + + if got := hb.MaxVolumeCounts[string(diskType)]; got != uint32(expectedSlots) { + t.Errorf("expected multiple ec volumes to be counted, got %d want %d", got, expectedSlots) + } + }) +}