Implement volume-aware task conflict checking

MAJOR IMPROVEMENT: Tasks now conflict by volume ID, not globally by task type

Changes:
- PRIMARY RULE: Tasks on the same volume ID always conflict (prevents race conditions)
- SECONDARY RULE: Minimal global task type conflicts (currently none)
- Add isDiskAvailableForVolume() for volume-specific availability checking
- Add GetAvailableDisksForVolume() and GetDisksWithEffectiveCapacityForVolume()
- Remove overly restrictive global task type conflicts
- Update planning functions to focus on capacity, not conflicts

Benefits:
 Multiple vacuum tasks can run on different volumes simultaneously
 Balance and erasure coding can run on different volumes
 Still prevents dangerous concurrent operations on same volume
 Much more efficient resource utilization
 Maintains data integrity and prevents race conditions

This addresses the user feedback that task conflicts should be volume-specific,
not global task type restrictions.
This commit is contained in:
chrislu
2025-08-10 18:02:42 -07:00
parent 5c1e6e904d
commit 751cfac7d7
3 changed files with 111 additions and 31 deletions

View File

@@ -227,13 +227,9 @@ func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType
return false
}
// Check for conflicting task types in active tasks only
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
// For planning purposes, we only check capacity constraints
// Volume-specific conflicts will be checked when the actual task is scheduled
// with knowledge of the specific volume ID
return true
}
@@ -298,3 +294,52 @@ func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk)
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// GetDisksWithEffectiveCapacityForVolume returns disks with effective capacity for a specific volume
// Uses volume-aware conflict checking to prevent race conditions on the same volume
func (at *ActiveTopology) GetDisksWithEffectiveCapacityForVolume(taskType TaskType, volumeID uint32, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailableForVolume(disk, taskType, volumeID) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}

View File

@@ -64,7 +64,7 @@ func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
}
}
// isDiskAvailable checks if a disk can accept new tasks
// isDiskAvailable checks if a disk can accept new tasks (general availability)
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
// Check if disk has too many pending and active tasks
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
@@ -72,9 +72,21 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b
return false
}
// Check for conflicting task types
// For general availability, only check disk capacity
// Volume-specific conflicts are checked in isDiskAvailableForVolume
return true
}
// isDiskAvailableForVolume checks if a disk can accept a new task for a specific volume
func (at *ActiveTopology) isDiskAvailableForVolume(disk *activeDisk, taskType TaskType, volumeID uint32) bool {
// Check basic availability first
if !at.isDiskAvailable(disk, taskType) {
return false
}
// Check for volume-specific conflicts
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
if at.areTasksConflicting(task, taskType, volumeID) {
return false
}
}
@@ -82,29 +94,28 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b
return true
}
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Define conflicting task types to prevent dangerous concurrent operations
// These conflicts prevent race conditions and data integrity issues
conflictMap := map[TaskType][]TaskType{
// Vacuum conflicts with balance and erasure coding (and ec_vacuum)
TaskType("vacuum"): {TaskType("balance"), TaskType("erasure_coding"), TaskType("ec_vacuum")},
// Balance conflicts with vacuum and erasure coding operations
TaskType("balance"): {TaskType("vacuum"), TaskType("erasure_coding"), TaskType("ec_vacuum")},
// Erasure coding conflicts with vacuum and balance operations
TaskType("erasure_coding"): {TaskType("vacuum"), TaskType("balance"), TaskType("ec_vacuum")},
// EC vacuum conflicts with all other maintenance operations on same volume
TaskType("ec_vacuum"): {TaskType("vacuum"), TaskType("balance"), TaskType("erasure_coding")},
// Replication generally should not conflict with read-only operations
// but should conflict with destructive operations
TaskType("replication"): {TaskType("vacuum"), TaskType("balance")},
// areTasksConflicting checks if a new task conflicts with an existing task
func (at *ActiveTopology) areTasksConflicting(existingTask *taskState, newTaskType TaskType, newVolumeID uint32) bool {
// PRIMARY RULE: Tasks on the same volume always conflict (prevents race conditions)
if existingTask.VolumeID == newVolumeID {
return true
}
if conflicts, exists := conflictMap[existing]; exists {
// SECONDARY RULE: Some task types may have global conflicts (rare cases)
return at.areTaskTypesGloballyConflicting(existingTask.TaskType, newTaskType)
}
// areTaskTypesGloballyConflicting checks for rare global task type conflicts
// These should be minimal - most conflicts should be volume-specific
func (at *ActiveTopology) areTaskTypesGloballyConflicting(existing, new TaskType) bool {
// Define very limited global conflicts (cross-volume conflicts)
// Most conflicts should be volume-based, not global
globalConflictMap := map[TaskType][]TaskType{
// Example: Some hypothetical global resource conflicts could go here
// Currently empty - volume-based conflicts are sufficient
}
if conflicts, exists := globalConflictMap[existing]; exists {
for _, conflictType := range conflicts {
if conflictType == new {
return true

View File

@@ -89,6 +89,30 @@ func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID str
return available
}
// GetAvailableDisksForVolume returns disks that can accept a task for a specific volume
// This method uses volume-aware conflict checking to prevent race conditions
func (at *ActiveTopology) GetAvailableDisksForVolume(taskType TaskType, volumeID uint32, excludeNodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailableForVolume(disk, taskType, volumeID) {
// Create a copy with current load count
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
available = append(available, &diskCopy)
}
}
return available
}
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
at.mutex.RLock()