Admin: refactoring active topology (#7073)

* refactoring

* add ec shard size

* address comments

* passing task id

There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way.

This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature.

A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution.

* 1 source multiple destinations

* task supports multi source and destination

* ec needs to clean up previous shards

* use erasure coding constants

* getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe  should return StorageSlotChange for calculation

* use CanAccommodate to calculate

* remove dead code

* address comments

* fix Mutex Copying in Protobuf Structs

* use constants

* fix estimatedSize

The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize.

* at.assignTaskToDisk(task)

* refactoring

* Update weed/admin/topology/internal.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fail fast

* fix compilation

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* indexes for volume and shard locations

* dedup with ToVolumeSlots

* return an additional boolean to indicate success, or an error

* Update abstract_sql_store.go

* fix

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/task_management.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* faster findVolumeDisk

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor

* simplify

* remove unused GetDiskStorageImpact function

* refactor

* add comments

* Update weed/admin/topology/storage_impact.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update storage_impact.go

* AddPendingTask

The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Chris Lu
2025-08-03 01:35:38 -07:00
committed by GitHub
parent 315fcc70b2
commit 0ecb466eda
18 changed files with 2579 additions and 467 deletions

View File

@@ -1,98 +1,5 @@
package topology package topology
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// TaskType represents different types of maintenance operations
type TaskType string
// TaskStatus represents the current status of a task
type TaskStatus string
// Common task type constants
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeBalance TaskType = "balance"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeReplication TaskType = "replication"
)
// Common task status constants
const (
TaskStatusPending TaskStatus = "pending"
TaskStatusInProgress TaskStatus = "in_progress"
TaskStatusCompleted TaskStatus = "completed"
)
// taskState represents the current state of tasks affecting the topology (internal)
type taskState struct {
VolumeID uint32 `json:"volume_id"`
TaskType TaskType `json:"task_type"`
SourceServer string `json:"source_server"`
SourceDisk uint32 `json:"source_disk"`
TargetServer string `json:"target_server,omitempty"`
TargetDisk uint32 `json:"target_disk,omitempty"`
Status TaskStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
}
// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
type DiskInfo struct {
NodeID string `json:"node_id"`
DiskID uint32 `json:"disk_id"`
DiskType string `json:"disk_type"`
DataCenter string `json:"data_center"`
Rack string `json:"rack"`
DiskInfo *master_pb.DiskInfo `json:"disk_info"`
LoadCount int `json:"load_count"` // Number of active tasks
}
// activeDisk represents internal disk state (private)
type activeDisk struct {
*DiskInfo
pendingTasks []*taskState
assignedTasks []*taskState
recentTasks []*taskState // Completed in last N seconds
}
// activeNode represents a node with its disks (private)
type activeNode struct {
nodeID string
dataCenter string
rack string
nodeInfo *master_pb.DataNodeInfo
disks map[uint32]*activeDisk // DiskID -> activeDisk
}
// ActiveTopology provides a real-time view of cluster state with task awareness
type ActiveTopology struct {
// Core topology from master
topologyInfo *master_pb.TopologyInfo
lastUpdated time.Time
// Structured topology for easy access (private)
nodes map[string]*activeNode // NodeID -> activeNode
disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
// Task states affecting the topology (private)
pendingTasks map[string]*taskState
assignedTasks map[string]*taskState
recentTasks map[string]*taskState
// Configuration
recentTaskWindowSeconds int
// Synchronization
mutex sync.RWMutex
}
// NewActiveTopology creates a new ActiveTopology instance // NewActiveTopology creates a new ActiveTopology instance
func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology { func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
if recentTaskWindowSeconds <= 0 { if recentTaskWindowSeconds <= 0 {
@@ -102,339 +9,11 @@ func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
return &ActiveTopology{ return &ActiveTopology{
nodes: make(map[string]*activeNode), nodes: make(map[string]*activeNode),
disks: make(map[string]*activeDisk), disks: make(map[string]*activeDisk),
volumeIndex: make(map[uint32][]string),
ecShardIndex: make(map[uint32][]string),
pendingTasks: make(map[string]*taskState), pendingTasks: make(map[string]*taskState),
assignedTasks: make(map[string]*taskState), assignedTasks: make(map[string]*taskState),
recentTasks: make(map[string]*taskState), recentTasks: make(map[string]*taskState),
recentTaskWindowSeconds: recentTaskWindowSeconds, recentTaskWindowSeconds: recentTaskWindowSeconds,
} }
} }
// UpdateTopology updates the topology information from master
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
at.mutex.Lock()
defer at.mutex.Unlock()
at.topologyInfo = topologyInfo
at.lastUpdated = time.Now()
// Rebuild structured topology
at.nodes = make(map[string]*activeNode)
at.disks = make(map[string]*activeDisk)
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, nodeInfo := range rack.DataNodeInfos {
node := &activeNode{
nodeID: nodeInfo.Id,
dataCenter: dc.Id,
rack: rack.Id,
nodeInfo: nodeInfo,
disks: make(map[uint32]*activeDisk),
}
// Add disks for this node
for diskType, diskInfo := range nodeInfo.DiskInfos {
disk := &activeDisk{
DiskInfo: &DiskInfo{
NodeID: nodeInfo.Id,
DiskID: diskInfo.DiskId,
DiskType: diskType,
DataCenter: dc.Id,
Rack: rack.Id,
DiskInfo: diskInfo,
},
}
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
node.disks[diskInfo.DiskId] = disk
at.disks[diskKey] = disk
}
at.nodes[nodeInfo.Id] = node
}
}
}
// Reassign task states to updated topology
at.reassignTaskStates()
glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks))
return nil
}
// AddPendingTask adds a pending task to the topology
func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32,
sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) {
at.mutex.Lock()
defer at.mutex.Unlock()
task := &taskState{
VolumeID: volumeID,
TaskType: taskType,
SourceServer: sourceServer,
SourceDisk: sourceDisk,
TargetServer: targetServer,
TargetDisk: targetDisk,
Status: TaskStatusPending,
StartedAt: time.Now(),
}
at.pendingTasks[taskID] = task
at.assignTaskToDisk(task)
}
// AssignTask moves a task from pending to assigned
func (at *ActiveTopology) AssignTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.pendingTasks[taskID]
if !exists {
return fmt.Errorf("pending task %s not found", taskID)
}
delete(at.pendingTasks, taskID)
task.Status = TaskStatusInProgress
at.assignedTasks[taskID] = task
at.reassignTaskStates()
return nil
}
// CompleteTask moves a task from assigned to recent
func (at *ActiveTopology) CompleteTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.assignedTasks[taskID]
if !exists {
return fmt.Errorf("assigned task %s not found", taskID)
}
delete(at.assignedTasks, taskID)
task.Status = TaskStatusCompleted
task.CompletedAt = time.Now()
at.recentTasks[taskID] = task
at.reassignTaskStates()
// Clean up old recent tasks
at.cleanupRecentTasks()
return nil
}
// GetAvailableDisks returns disks that can accept new tasks of the given type
func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
// Create a copy with current load count
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
available = append(available, &diskCopy)
}
}
return available
}
// GetDiskLoad returns the current load on a disk (number of active tasks)
func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
return len(disk.pendingTasks) + len(disk.assignedTasks)
}
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
for _, task := range at.recentTasks {
if task.VolumeID == volumeID && task.TaskType == taskType {
return true
}
}
return false
}
// GetAllNodes returns information about all nodes (public interface)
func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
result := make(map[string]*master_pb.DataNodeInfo)
for nodeID, node := range at.nodes {
result[nodeID] = node.nodeInfo
}
return result
}
// GetTopologyInfo returns the current topology information (read-only access)
func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
return at.topologyInfo
}
// GetNodeDisks returns all disks for a specific node
func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
node, exists := at.nodes[nodeID]
if !exists {
return nil
}
var disks []*DiskInfo
for _, disk := range node.disks {
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
disks = append(disks, &diskCopy)
}
return disks
}
// DestinationPlan represents a planned destination for a volume/shard operation
type DestinationPlan struct {
TargetNode string `json:"target_node"`
TargetDisk uint32 `json:"target_disk"`
TargetRack string `json:"target_rack"`
TargetDC string `json:"target_dc"`
ExpectedSize uint64 `json:"expected_size"`
PlacementScore float64 `json:"placement_score"`
Conflicts []string `json:"conflicts"`
}
// MultiDestinationPlan represents multiple planned destinations for operations like EC
type MultiDestinationPlan struct {
Plans []*DestinationPlan `json:"plans"`
TotalShards int `json:"total_shards"`
SuccessfulRack int `json:"successful_racks"`
SuccessfulDCs int `json:"successful_dcs"`
}
// Private methods
// reassignTaskStates assigns tasks to the appropriate disks
func (at *ActiveTopology) reassignTaskStates() {
// Clear existing task assignments
for _, disk := range at.disks {
disk.pendingTasks = nil
disk.assignedTasks = nil
disk.recentTasks = nil
}
// Reassign pending tasks
for _, task := range at.pendingTasks {
at.assignTaskToDisk(task)
}
// Reassign assigned tasks
for _, task := range at.assignedTasks {
at.assignTaskToDisk(task)
}
// Reassign recent tasks
for _, task := range at.recentTasks {
at.assignTaskToDisk(task)
}
}
// assignTaskToDisk assigns a task to the appropriate disk(s)
func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
// Assign to source disk
sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk)
if sourceDisk, exists := at.disks[sourceKey]; exists {
switch task.Status {
case TaskStatusPending:
sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task)
case TaskStatusInProgress:
sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task)
case TaskStatusCompleted:
sourceDisk.recentTasks = append(sourceDisk.recentTasks, task)
}
}
// Assign to target disk if it exists and is different from source
if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) {
targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk)
if targetDisk, exists := at.disks[targetKey]; exists {
switch task.Status {
case TaskStatusPending:
targetDisk.pendingTasks = append(targetDisk.pendingTasks, task)
case TaskStatusInProgress:
targetDisk.assignedTasks = append(targetDisk.assignedTasks, task)
case TaskStatusCompleted:
targetDisk.recentTasks = append(targetDisk.recentTasks, task)
}
}
}
}
// isDiskAvailable checks if a disk can accept new tasks
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
// Check if disk has too many active tasks
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if activeLoad >= 2 { // Max 2 concurrent tasks per disk
return false
}
// Check for conflicting task types
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Examples of conflicting task types
conflictMap := map[TaskType][]TaskType{
TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
}
if conflicts, exists := conflictMap[existing]; exists {
for _, conflictType := range conflicts {
if conflictType == new {
return true
}
}
}
return false
}
// cleanupRecentTasks removes old recent tasks
func (at *ActiveTopology) cleanupRecentTasks() {
cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
for taskID, task := range at.recentTasks {
if task.CompletedAt.Before(cutoff) {
delete(at.recentTasks, taskID)
}
}
}

View File

@@ -1,6 +1,7 @@
package topology package topology
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@@ -9,6 +10,16 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// Helper function to find a disk by ID for testing - reduces code duplication
func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo {
for _, disk := range disks {
if disk.DiskID == diskID {
return disk
}
}
return nil
}
// TestActiveTopologyBasicOperations tests basic topology management // TestActiveTopologyBasicOperations tests basic topology management
func TestActiveTopologyBasicOperations(t *testing.T) { func TestActiveTopologyBasicOperations(t *testing.T) {
topology := NewActiveTopology(10) topology := NewActiveTopology(10)
@@ -58,8 +69,19 @@ func TestTaskLifecycle(t *testing.T) {
taskID := "balance-001" taskID := "balance-001"
// 1. Add pending task // 1. Add pending task
topology.AddPendingTask(taskID, TaskTypeBalance, 1001, err := topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "10.0.0.2:8080", 1) TaskID: taskID,
TaskType: TaskTypeBalance,
VolumeID: 1001,
VolumeSize: 1024 * 1024 * 1024,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "10.0.0.2:8080", DiskID: 1},
},
})
assert.NoError(t, err, "Should add pending task successfully")
// Verify pending state // Verify pending state
assert.Equal(t, 1, len(topology.pendingTasks)) assert.Equal(t, 1, len(topology.pendingTasks))
@@ -77,7 +99,7 @@ func TestTaskLifecycle(t *testing.T) {
assert.Equal(t, 1, len(targetDisk.pendingTasks)) assert.Equal(t, 1, len(targetDisk.pendingTasks))
// 2. Assign task // 2. Assign task
err := topology.AssignTask(taskID) err = topology.AssignTask(taskID)
require.NoError(t, err) require.NoError(t, err)
// Verify assigned state // Verify assigned state
@@ -258,8 +280,7 @@ func TestTargetSelectionScenarios(t *testing.T) {
assert.NotEqual(t, tt.excludeNode, disk.NodeID, assert.NotEqual(t, tt.excludeNode, disk.NodeID,
"Available disk should not be on excluded node") "Available disk should not be on excluded node")
load := tt.topology.GetDiskLoad(disk.NodeID, disk.DiskID) assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2")
assert.Less(t, load, 2, "Disk load should be less than 2")
} }
}) })
} }
@@ -271,37 +292,65 @@ func TestDiskLoadCalculation(t *testing.T) {
topology.UpdateTopology(createSampleTopology()) topology.UpdateTopology(createSampleTopology())
// Initially no load // Initially no load
load := topology.GetDiskLoad("10.0.0.1:8080", 0) disks := topology.GetNodeDisks("10.0.0.1:8080")
assert.Equal(t, 0, load) targetDisk := findDiskByID(disks, 0)
require.NotNil(t, targetDisk, "Should find disk with ID 0")
assert.Equal(t, 0, targetDisk.LoadCount)
// Add pending task // Add pending task
topology.AddPendingTask("task1", TaskTypeBalance, 1001, err := topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "10.0.0.2:8080", 1) TaskID: "task1",
TaskType: TaskTypeBalance,
VolumeID: 1001,
VolumeSize: 1024 * 1024 * 1024,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "10.0.0.2:8080", DiskID: 1},
},
})
assert.NoError(t, err, "Should add pending task successfully")
// Check load increased // Check load increased
load = topology.GetDiskLoad("10.0.0.1:8080", 0) disks = topology.GetNodeDisks("10.0.0.1:8080")
assert.Equal(t, 1, load) targetDisk = findDiskByID(disks, 0)
assert.Equal(t, 1, targetDisk.LoadCount)
// Add another task to same disk // Add another task to same disk
topology.AddPendingTask("task2", TaskTypeVacuum, 1002, err = topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "", 0) TaskID: "task2",
TaskType: TaskTypeVacuum,
VolumeID: 1002,
VolumeSize: 0,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
},
})
assert.NoError(t, err, "Should add vacuum task successfully")
load = topology.GetDiskLoad("10.0.0.1:8080", 0) disks = topology.GetNodeDisks("10.0.0.1:8080")
assert.Equal(t, 2, load) targetDisk = findDiskByID(disks, 0)
assert.Equal(t, 2, targetDisk.LoadCount)
// Move one task to assigned // Move one task to assigned
topology.AssignTask("task1") topology.AssignTask("task1")
// Load should still be 2 (1 pending + 1 assigned) // Load should still be 2 (1 pending + 1 assigned)
load = topology.GetDiskLoad("10.0.0.1:8080", 0) disks = topology.GetNodeDisks("10.0.0.1:8080")
assert.Equal(t, 2, load) targetDisk = findDiskByID(disks, 0)
assert.Equal(t, 2, targetDisk.LoadCount)
// Complete one task // Complete one task
topology.CompleteTask("task1") topology.CompleteTask("task1")
// Load should decrease to 1 // Load should decrease to 1
load = topology.GetDiskLoad("10.0.0.1:8080", 0) disks = topology.GetNodeDisks("10.0.0.1:8080")
assert.Equal(t, 1, load) targetDisk = findDiskByID(disks, 0)
assert.Equal(t, 1, targetDisk.LoadCount)
} }
// TestTaskConflictDetection tests task conflict detection // TestTaskConflictDetection tests task conflict detection
@@ -310,8 +359,19 @@ func TestTaskConflictDetection(t *testing.T) {
topology.UpdateTopology(createSampleTopology()) topology.UpdateTopology(createSampleTopology())
// Add a balance task // Add a balance task
topology.AddPendingTask("balance1", TaskTypeBalance, 1001, err := topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "10.0.0.2:8080", 1) TaskID: "balance1",
TaskType: TaskTypeBalance,
VolumeID: 1001,
VolumeSize: 1024 * 1024 * 1024,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "10.0.0.2:8080", DiskID: 1},
},
})
assert.NoError(t, err, "Should add balance task successfully")
topology.AssignTask("balance1") topology.AssignTask("balance1")
// Try to get available disks for vacuum (conflicts with balance) // Try to get available disks for vacuum (conflicts with balance)
@@ -448,8 +508,22 @@ func createTopologyWithLoad() *ActiveTopology {
topology.UpdateTopology(createSampleTopology()) topology.UpdateTopology(createSampleTopology())
// Add some existing tasks to create load // Add some existing tasks to create load
topology.AddPendingTask("existing1", TaskTypeVacuum, 2001, err := topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "", 0) TaskID: "existing1",
TaskType: TaskTypeVacuum,
VolumeID: 2001,
VolumeSize: 0,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
},
})
if err != nil {
// In test helper function, just log error instead of failing
fmt.Printf("Warning: Failed to add existing task: %v\n", err)
}
topology.AssignTask("existing1") topology.AssignTask("existing1")
return topology return topology
@@ -466,12 +540,38 @@ func createTopologyWithConflicts() *ActiveTopology {
topology.UpdateTopology(createSampleTopology()) topology.UpdateTopology(createSampleTopology())
// Add conflicting tasks // Add conflicting tasks
topology.AddPendingTask("balance1", TaskTypeBalance, 3001, err := topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 0, "10.0.0.2:8080", 0) TaskID: "balance1",
TaskType: TaskTypeBalance,
VolumeID: 3001,
VolumeSize: 1024 * 1024 * 1024,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 0},
},
Destinations: []TaskDestinationSpec{
{ServerID: "10.0.0.2:8080", DiskID: 0},
},
})
if err != nil {
fmt.Printf("Warning: Failed to add balance task: %v\n", err)
}
topology.AssignTask("balance1") topology.AssignTask("balance1")
topology.AddPendingTask("ec1", TaskTypeErasureCoding, 3002, err = topology.AddPendingTask(TaskSpec{
"10.0.0.1:8080", 1, "", 0) TaskID: "ec1",
TaskType: TaskTypeErasureCoding,
VolumeID: 3002,
VolumeSize: 1024 * 1024 * 1024,
Sources: []TaskSourceSpec{
{ServerID: "10.0.0.1:8080", DiskID: 1},
},
Destinations: []TaskDestinationSpec{
{ServerID: "", DiskID: 0}, // EC doesn't have single destination
},
})
if err != nil {
fmt.Printf("Warning: Failed to add EC task: %v\n", err)
}
topology.AssignTask("ec1") topology.AssignTask("ec1")
return topology return topology

View File

@@ -0,0 +1,300 @@
package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
// This considers BOTH pending and assigned tasks for capacity reservation.
//
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
//
// The calculation includes:
// - Pending tasks: Reserve capacity immediately when added
// - Assigned tasks: Continue to reserve capacity during execution
// - Recently completed tasks are NOT counted against capacity
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return 0
}
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(capacity.VolumeSlots)
}
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
// This provides granular information about available volume slots and shard slots
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
return at.getEffectiveAvailableCapacityUnsafe(disk)
}
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
// This shows the net impact from all pending and assigned tasks
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
return at.getEffectiveCapacityUnsafe(disk)
}
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
//
// Parameters:
// - taskType: type of task to check compatibility for
// - excludeNodeID: node to exclude from results
// - minCapacity: minimum effective capacity required (in volume slots)
//
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
// This helps avoid over-scheduling tasks to the same disk
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
// Consider both pending and active tasks for scheduling decisions
if at.isDiskAvailableForPlanning(disk, taskType) {
// Check if disk can accommodate new task considering pending tasks
planningCapacity := at.getPlanningCapacityUnsafe(disk)
if int64(planningCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with planning information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return false
}
// Check basic availability
if !at.isDiskAvailable(disk, taskType) {
return false
}
// Check effective capacity
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
}
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
// Use the centralized helper function to calculate task storage impact
totalImpact := at.calculateTaskStorageImpact(disk)
// Calculate available capacity considering impact (negative impact reduces availability)
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
// Check total load including pending tasks
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if totalLoad >= MaxTotalTaskLoadPerDisk {
return false
}
// Check for conflicting task types in active tasks only
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
// between multiple capacity calculation functions.
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
totalImpact := StorageSlotChange{}
// Process both pending and assigned tasks with identical logic
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
for _, taskList := range taskLists {
for _, task := range taskList {
// Calculate impact for all source locations
for _, source := range task.Sources {
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
totalImpact.AddInPlace(source.StorageChange)
}
}
// Calculate impact for all destination locations
for _, dest := range task.Destinations {
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
totalImpact.AddInPlace(dest.StorageChange)
}
}
}
}
return totalImpact
}
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
// Returns StorageSlotChange representing the net impact from all tasks
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
return at.calculateTaskStorageImpact(disk)
}
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
netImpact := at.getEffectiveCapacityUnsafe(disk)
// Calculate available volume slots (negative impact reduces availability)
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}

View File

@@ -0,0 +1,114 @@
package topology
import (
"fmt"
"time"
)
// reassignTaskStates assigns tasks to the appropriate disks
func (at *ActiveTopology) reassignTaskStates() {
// Clear existing task assignments
for _, disk := range at.disks {
disk.pendingTasks = nil
disk.assignedTasks = nil
disk.recentTasks = nil
}
// Reassign pending tasks
for _, task := range at.pendingTasks {
at.assignTaskToDisk(task)
}
// Reassign assigned tasks
for _, task := range at.assignedTasks {
at.assignTaskToDisk(task)
}
// Reassign recent tasks
for _, task := range at.recentTasks {
at.assignTaskToDisk(task)
}
}
// assignTaskToDisk assigns a task to the appropriate disk(s)
func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
addedDisks := make(map[string]bool)
// Local helper function to assign task to a disk and avoid code duplication
assign := func(server string, diskID uint32) {
key := fmt.Sprintf("%s:%d", server, diskID)
if server == "" || addedDisks[key] {
return
}
if disk, exists := at.disks[key]; exists {
switch task.Status {
case TaskStatusPending:
disk.pendingTasks = append(disk.pendingTasks, task)
case TaskStatusInProgress:
disk.assignedTasks = append(disk.assignedTasks, task)
case TaskStatusCompleted:
disk.recentTasks = append(disk.recentTasks, task)
}
addedDisks[key] = true
}
}
// Assign to all source disks
for _, source := range task.Sources {
assign(source.SourceServer, source.SourceDisk)
}
// Assign to all destination disks (duplicates automatically avoided by helper)
for _, dest := range task.Destinations {
assign(dest.TargetServer, dest.TargetDisk)
}
}
// isDiskAvailable checks if a disk can accept new tasks
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
// Check if disk has too many pending and active tasks
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if activeLoad >= MaxConcurrentTasksPerDisk {
return false
}
// Check for conflicting task types
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Examples of conflicting task types
conflictMap := map[TaskType][]TaskType{
TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
}
if conflicts, exists := conflictMap[existing]; exists {
for _, conflictType := range conflicts {
if conflictType == new {
return true
}
}
}
return false
}
// cleanupRecentTasks removes old recent tasks
func (at *ActiveTopology) cleanupRecentTasks() {
cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
for taskID, task := range at.recentTasks {
if task.CompletedAt.Before(cutoff) {
delete(at.recentTasks, taskID)
}
}
}

View File

@@ -0,0 +1,50 @@
package topology
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// CalculateTaskStorageImpact calculates storage impact for different task types
func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChange, targetChange StorageSlotChange) {
switch taskType {
case TaskTypeErasureCoding:
// EC task: distributes shards to MULTIPLE targets, source reserves with zero impact
// Source reserves capacity but with zero StorageSlotChange (no actual capacity consumption during planning)
// WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-target handling
// This simplified function returns zero impact; real EC requires specialized multi-destination calculation
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
case TaskTypeBalance:
// Balance task: moves volume from source to target
// Source loses 1 volume, target gains 1 volume
return StorageSlotChange{VolumeSlots: -1, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
case TaskTypeVacuum:
// Vacuum task: frees space by removing deleted entries, no slot change
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
case TaskTypeReplication:
// Replication task: creates new replica on target
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
default:
// Unknown task type, assume minimal impact
glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType)
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
}
}
// CalculateECShardStorageImpact calculates storage impact for EC shards specifically
func CalculateECShardStorageImpact(shardCount int32, expectedShardSize int64) StorageSlotChange {
// EC shards are typically much smaller than full volumes
// Use shard-level tracking for granular capacity planning
return StorageSlotChange{VolumeSlots: 0, ShardSlots: shardCount}
}
// CalculateECShardCleanupImpact calculates storage impact for cleaning up existing EC shards
func CalculateECShardCleanupImpact(originalVolumeSize int64) StorageSlotChange {
// Cleaning up existing EC shards frees shard slots
// Use the actual EC configuration constants for accurate shard count
return StorageSlotChange{VolumeSlots: 0, ShardSlots: -int32(erasure_coding.TotalShardsCount)} // Negative = freed capacity
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,120 @@
package topology
import (
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// TaskSource represents a single source in a multi-source task (for replicated volume cleanup)
type TaskSource struct {
SourceServer string `json:"source_server"`
SourceDisk uint32 `json:"source_disk"`
StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this source
EstimatedSize int64 `json:"estimated_size"` // Estimated size for this source
}
// TaskDestination represents a single destination in a multi-destination task
type TaskDestination struct {
TargetServer string `json:"target_server"`
TargetDisk uint32 `json:"target_disk"`
StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this destination
EstimatedSize int64 `json:"estimated_size"` // Estimated size for this destination
}
// taskState represents the current state of tasks affecting the topology (internal)
// Uses unified multi-source/multi-destination design:
// - Single-source tasks (balance, vacuum, replication): 1 source, 1 destination
// - Multi-source EC tasks (replicated volumes): N sources, M destinations
type taskState struct {
VolumeID uint32 `json:"volume_id"`
TaskType TaskType `json:"task_type"`
Status TaskStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
EstimatedSize int64 `json:"estimated_size"` // Total estimated size of task
// Unified source and destination arrays (always used)
Sources []TaskSource `json:"sources"` // Source locations (1+ for all task types)
Destinations []TaskDestination `json:"destinations"` // Destination locations (1+ for all task types)
}
// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
type DiskInfo struct {
NodeID string `json:"node_id"`
DiskID uint32 `json:"disk_id"`
DiskType string `json:"disk_type"`
DataCenter string `json:"data_center"`
Rack string `json:"rack"`
DiskInfo *master_pb.DiskInfo `json:"disk_info"`
LoadCount int `json:"load_count"` // Number of active tasks
}
// activeDisk represents internal disk state (private)
type activeDisk struct {
*DiskInfo
pendingTasks []*taskState
assignedTasks []*taskState
recentTasks []*taskState // Completed in last N seconds
}
// activeNode represents a node with its disks (private)
type activeNode struct {
nodeID string
dataCenter string
rack string
nodeInfo *master_pb.DataNodeInfo
disks map[uint32]*activeDisk // DiskID -> activeDisk
}
// ActiveTopology provides a real-time view of cluster state with task awareness
type ActiveTopology struct {
// Core topology from master
topologyInfo *master_pb.TopologyInfo
lastUpdated time.Time
// Structured topology for easy access (private)
nodes map[string]*activeNode // NodeID -> activeNode
disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
// Performance indexes for O(1) lookups (private)
volumeIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where volume replicas exist
ecShardIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where EC shards exist
// Task states affecting the topology (private)
pendingTasks map[string]*taskState
assignedTasks map[string]*taskState
recentTasks map[string]*taskState
// Configuration
recentTaskWindowSeconds int
// Synchronization
mutex sync.RWMutex
}
// DestinationPlan represents a planned destination for a volume/shard operation
type DestinationPlan struct {
TargetNode string `json:"target_node"`
TargetDisk uint32 `json:"target_disk"`
TargetRack string `json:"target_rack"`
TargetDC string `json:"target_dc"`
ExpectedSize uint64 `json:"expected_size"`
PlacementScore float64 `json:"placement_score"`
Conflicts []string `json:"conflicts"`
}
// MultiDestinationPlan represents multiple planned destinations for operations like EC
type MultiDestinationPlan struct {
Plans []*DestinationPlan `json:"plans"`
TotalShards int `json:"total_shards"`
SuccessfulRack int `json:"successful_racks"`
SuccessfulDCs int `json:"successful_dcs"`
}
// VolumeReplica represents a replica location with server and disk information
type VolumeReplica struct {
ServerID string `json:"server_id"`
DiskID uint32 `json:"disk_id"`
}

View File

@@ -0,0 +1,264 @@
package topology
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// AssignTask moves a task from pending to assigned and reserves capacity
func (at *ActiveTopology) AssignTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.pendingTasks[taskID]
if !exists {
return fmt.Errorf("pending task %s not found", taskID)
}
// Check if all destination disks have sufficient capacity to reserve
for _, dest := range task.Destinations {
targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
if targetDisk, exists := at.disks[targetKey]; exists {
availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
// Check if we have enough total capacity using the improved unified comparison
if !availableCapacity.CanAccommodate(dest.StorageChange) {
return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
}
} else if dest.TargetServer != "" {
// Fail fast if destination disk is not found in topology
return fmt.Errorf("destination disk %s not found in topology", targetKey)
}
}
// Move task to assigned and reserve capacity
delete(at.pendingTasks, taskID)
task.Status = TaskStatusInProgress
at.assignedTasks[taskID] = task
at.reassignTaskStates()
// Log capacity reservation information for all sources and destinations
totalSourceImpact := StorageSlotChange{}
totalDestImpact := StorageSlotChange{}
for _, source := range task.Sources {
totalSourceImpact.AddInPlace(source.StorageChange)
}
for _, dest := range task.Destinations {
totalDestImpact.AddInPlace(dest.StorageChange)
}
glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
return nil
}
// CompleteTask moves a task from assigned to recent and releases reserved capacity
// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
// should be handled by the master when it receives the task completion notification.
func (at *ActiveTopology) CompleteTask(taskID string) error {
at.mutex.Lock()
defer at.mutex.Unlock()
task, exists := at.assignedTasks[taskID]
if !exists {
return fmt.Errorf("assigned task %s not found", taskID)
}
// Release reserved capacity by moving task to completed state
delete(at.assignedTasks, taskID)
task.Status = TaskStatusCompleted
task.CompletedAt = time.Now()
at.recentTasks[taskID] = task
at.reassignTaskStates()
// Log capacity release information for all sources and destinations
totalSourceImpact := StorageSlotChange{}
totalDestImpact := StorageSlotChange{}
for _, source := range task.Sources {
totalSourceImpact.AddInPlace(source.StorageChange)
}
for _, dest := range task.Destinations {
totalDestImpact.AddInPlace(dest.StorageChange)
}
glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
// Clean up old recent tasks
at.cleanupRecentTasks()
return nil
}
// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
// This should be called when the master updates the topology with new VolumeCount information
func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
at.mutex.Lock()
defer at.mutex.Unlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
oldCount := disk.DiskInfo.DiskInfo.VolumeCount
disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
}
}
// AddPendingTask is the unified function that handles both simple and complex task creation
func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
// Validation
if len(spec.Sources) == 0 {
return fmt.Errorf("at least one source is required")
}
if len(spec.Destinations) == 0 {
return fmt.Errorf("at least one destination is required")
}
at.mutex.Lock()
defer at.mutex.Unlock()
// Build sources array
sources := make([]TaskSource, len(spec.Sources))
for i, sourceSpec := range spec.Sources {
var storageImpact StorageSlotChange
var estimatedSize int64
if sourceSpec.StorageImpact != nil {
// Use manually specified impact
storageImpact = *sourceSpec.StorageImpact
} else {
// Auto-calculate based on task type and cleanup type
storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
}
if sourceSpec.EstimatedSize != nil {
estimatedSize = *sourceSpec.EstimatedSize
} else {
estimatedSize = spec.VolumeSize // Default to volume size
}
sources[i] = TaskSource{
SourceServer: sourceSpec.ServerID,
SourceDisk: sourceSpec.DiskID,
StorageChange: storageImpact,
EstimatedSize: estimatedSize,
}
}
// Build destinations array
destinations := make([]TaskDestination, len(spec.Destinations))
for i, destSpec := range spec.Destinations {
var storageImpact StorageSlotChange
var estimatedSize int64
if destSpec.StorageImpact != nil {
// Use manually specified impact
storageImpact = *destSpec.StorageImpact
} else {
// Auto-calculate based on task type
_, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
}
if destSpec.EstimatedSize != nil {
estimatedSize = *destSpec.EstimatedSize
} else {
estimatedSize = spec.VolumeSize // Default to volume size
}
destinations[i] = TaskDestination{
TargetServer: destSpec.ServerID,
TargetDisk: destSpec.DiskID,
StorageChange: storageImpact,
EstimatedSize: estimatedSize,
}
}
// Create the task
task := &taskState{
VolumeID: spec.VolumeID,
TaskType: spec.TaskType,
Status: TaskStatusPending,
StartedAt: time.Now(),
EstimatedSize: spec.VolumeSize,
Sources: sources,
Destinations: destinations,
}
at.pendingTasks[spec.TaskID] = task
at.assignTaskToDisk(task)
glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
return nil
}
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
switch taskType {
case TaskTypeErasureCoding:
switch cleanupType {
case CleanupVolumeReplica:
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
return impact
case CleanupECShards:
return CalculateECShardCleanupImpact(volumeSize)
default:
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
return impact
}
default:
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
return impact
}
}
// SourceCleanupType indicates what type of data needs to be cleaned up from a source
type SourceCleanupType int
const (
CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
CleanupECShards // Clean up existing EC shards (frees shard slots)
)
// TaskSourceSpec represents a source specification for task creation
type TaskSourceSpec struct {
ServerID string
DiskID uint32
CleanupType SourceCleanupType // For EC: volume replica vs existing shards
StorageImpact *StorageSlotChange // Optional: manual override
EstimatedSize *int64 // Optional: manual override
}
// TaskDestinationSpec represents a destination specification for task creation
type TaskDestinationSpec struct {
ServerID string
DiskID uint32
StorageImpact *StorageSlotChange // Optional: manual override
EstimatedSize *int64 // Optional: manual override
}
// TaskSpec represents a complete task specification
type TaskSpec struct {
TaskID string
TaskType TaskType
VolumeID uint32
VolumeSize int64 // Used for auto-calculation when manual impacts not provided
Sources []TaskSourceSpec // Can be single or multiple
Destinations []TaskDestinationSpec // Can be single or multiple
}
// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec)
type TaskSourceLocation struct {
ServerID string
DiskID uint32
CleanupType SourceCleanupType // What type of cleanup is needed
}

View File

@@ -0,0 +1,253 @@
package topology
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// UpdateTopology updates the topology information from master
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
at.mutex.Lock()
defer at.mutex.Unlock()
at.topologyInfo = topologyInfo
at.lastUpdated = time.Now()
// Rebuild structured topology
at.nodes = make(map[string]*activeNode)
at.disks = make(map[string]*activeDisk)
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, nodeInfo := range rack.DataNodeInfos {
node := &activeNode{
nodeID: nodeInfo.Id,
dataCenter: dc.Id,
rack: rack.Id,
nodeInfo: nodeInfo,
disks: make(map[uint32]*activeDisk),
}
// Add disks for this node
for diskType, diskInfo := range nodeInfo.DiskInfos {
disk := &activeDisk{
DiskInfo: &DiskInfo{
NodeID: nodeInfo.Id,
DiskID: diskInfo.DiskId,
DiskType: diskType,
DataCenter: dc.Id,
Rack: rack.Id,
DiskInfo: diskInfo,
},
}
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
node.disks[diskInfo.DiskId] = disk
at.disks[diskKey] = disk
}
at.nodes[nodeInfo.Id] = node
}
}
}
// Rebuild performance indexes for O(1) lookups
at.rebuildIndexes()
// Reassign task states to updated topology
at.reassignTaskStates()
glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
return nil
}
// GetAvailableDisks returns disks that can accept new tasks of the given type
// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
// Create a copy with current load count and effective capacity
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
available = append(available, &diskCopy)
}
}
return available
}
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
for _, task := range at.recentTasks {
if task.VolumeID == volumeID && task.TaskType == taskType {
return true
}
}
return false
}
// GetAllNodes returns information about all nodes (public interface)
func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
result := make(map[string]*master_pb.DataNodeInfo)
for nodeID, node := range at.nodes {
result[nodeID] = node.nodeInfo
}
return result
}
// GetTopologyInfo returns the current topology information (read-only access)
func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
return at.topologyInfo
}
// GetNodeDisks returns all disks for a specific node
func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
node, exists := at.nodes[nodeID]
if !exists {
return nil
}
var disks []*DiskInfo
for _, disk := range node.disks {
diskCopy := *disk.DiskInfo
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
disks = append(disks, &diskCopy)
}
return disks
}
// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
func (at *ActiveTopology) rebuildIndexes() {
// Clear existing indexes
at.volumeIndex = make(map[uint32][]string)
at.ecShardIndex = make(map[uint32][]string)
// Rebuild indexes from current topology
for _, dc := range at.topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, nodeInfo := range rack.DataNodeInfos {
for _, diskInfo := range nodeInfo.DiskInfos {
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
// Index volumes
for _, volumeInfo := range diskInfo.VolumeInfos {
volumeID := volumeInfo.Id
at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
}
// Index EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeID := ecShardInfo.Id
at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
}
}
}
}
}
}
// GetVolumeLocations returns the disk locations for a volume using O(1) lookup
func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKeys, exists := at.volumeIndex[volumeID]
if !exists {
return []VolumeReplica{}
}
var replicas []VolumeReplica
for _, diskKey := range diskKeys {
if disk, diskExists := at.disks[diskKey]; diskExists {
// Verify collection matches (since index doesn't include collection)
if at.volumeMatchesCollection(disk, volumeID, collection) {
replicas = append(replicas, VolumeReplica{
ServerID: disk.NodeID,
DiskID: disk.DiskID,
})
}
}
}
return replicas
}
// GetECShardLocations returns the disk locations for EC shards using O(1) lookup
func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKeys, exists := at.ecShardIndex[volumeID]
if !exists {
return []VolumeReplica{}
}
var ecShards []VolumeReplica
for _, diskKey := range diskKeys {
if disk, diskExists := at.disks[diskKey]; diskExists {
// Verify collection matches (since index doesn't include collection)
if at.ecShardMatchesCollection(disk, volumeID, collection) {
ecShards = append(ecShards, VolumeReplica{
ServerID: disk.NodeID,
DiskID: disk.DiskID,
})
}
}
}
return ecShards
}
// volumeMatchesCollection checks if a volume on a disk matches the given collection
func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return false
}
for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
return true
}
}
return false
}
// ecShardMatchesCollection checks if EC shards on a disk match the given collection
func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return false
}
for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
return true
}
}
return false
}

View File

@@ -0,0 +1,97 @@
package topology
import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
// TaskType represents different types of maintenance operations
type TaskType string
// TaskStatus represents the current status of a task
type TaskStatus string
// Common task type constants
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeBalance TaskType = "balance"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeReplication TaskType = "replication"
)
// Common task status constants
const (
TaskStatusPending TaskStatus = "pending"
TaskStatusInProgress TaskStatus = "in_progress"
TaskStatusCompleted TaskStatus = "completed"
)
// Task and capacity management configuration constants
const (
// MaxConcurrentTasksPerDisk defines the maximum number of concurrent tasks per disk
// This prevents overloading a single disk with too many simultaneous operations
MaxConcurrentTasksPerDisk = 2
// MaxTotalTaskLoadPerDisk defines the maximum total task load (pending + active) per disk
// This allows more tasks to be queued but limits the total pipeline depth
MaxTotalTaskLoadPerDisk = 3
// MaxTaskLoadForECPlacement defines the maximum task load to consider a disk for EC placement
// This threshold ensures disks aren't overloaded when planning EC operations
MaxTaskLoadForECPlacement = 10
)
// StorageSlotChange represents storage impact at both volume and shard levels
type StorageSlotChange struct {
VolumeSlots int32 `json:"volume_slots"` // Volume-level slot changes (full volumes)
ShardSlots int32 `json:"shard_slots"` // Shard-level slot changes (EC shards, fractional capacity)
}
// Add returns a new StorageSlotChange with the sum of this and other
func (s StorageSlotChange) Add(other StorageSlotChange) StorageSlotChange {
return StorageSlotChange{
VolumeSlots: s.VolumeSlots + other.VolumeSlots,
ShardSlots: s.ShardSlots + other.ShardSlots,
}
}
// Subtract returns a new StorageSlotChange with other subtracted from this
func (s StorageSlotChange) Subtract(other StorageSlotChange) StorageSlotChange {
return StorageSlotChange{
VolumeSlots: s.VolumeSlots - other.VolumeSlots,
ShardSlots: s.ShardSlots - other.ShardSlots,
}
}
// AddInPlace adds other to this StorageSlotChange in-place
func (s *StorageSlotChange) AddInPlace(other StorageSlotChange) {
s.VolumeSlots += other.VolumeSlots
s.ShardSlots += other.ShardSlots
}
// SubtractInPlace subtracts other from this StorageSlotChange in-place
func (s *StorageSlotChange) SubtractInPlace(other StorageSlotChange) {
s.VolumeSlots -= other.VolumeSlots
s.ShardSlots -= other.ShardSlots
}
// IsZero returns true if both VolumeSlots and ShardSlots are zero
func (s StorageSlotChange) IsZero() bool {
return s.VolumeSlots == 0 && s.ShardSlots == 0
}
// ShardsPerVolumeSlot defines how many EC shards are equivalent to one volume slot
const ShardsPerVolumeSlot = erasure_coding.DataShardsCount
// ToVolumeSlots converts the entire StorageSlotChange to equivalent volume slots
func (s StorageSlotChange) ToVolumeSlots() int64 {
return int64(s.VolumeSlots) + int64(s.ShardSlots)/ShardsPerVolumeSlot
}
// ToShardSlots converts the entire StorageSlotChange to equivalent shard slots
func (s StorageSlotChange) ToShardSlots() int32 {
return s.ShardSlots + s.VolumeSlots*ShardsPerVolumeSlot
}
// CanAccommodate checks if this StorageSlotChange can accommodate the required StorageSlotChange
// Both are converted to shard slots for a more precise comparison
func (s StorageSlotChange) CanAccommodate(required StorageSlotChange) bool {
return s.ToShardSlots() >= required.ToShardSlots()
}

View File

@@ -4,13 +4,14 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"strings"
"sync"
) )
type SqlGenerator interface { type SqlGenerator interface {

View File

@@ -94,6 +94,7 @@ message TaskAssignment {
// TaskParams contains task-specific parameters with typed variants // TaskParams contains task-specific parameters with typed variants
message TaskParams { message TaskParams {
string task_id = 12; // ActiveTopology task ID for lifecycle management
uint32 volume_id = 1; uint32 volume_id = 1;
string server = 2; string server = 2;
string collection = 3; string collection = 3;

View File

@@ -804,6 +804,7 @@ func (x *TaskAssignment) GetMetadata() map[string]string {
// TaskParams contains task-specific parameters with typed variants // TaskParams contains task-specific parameters with typed variants
type TaskParams struct { type TaskParams struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
TaskId string `protobuf:"bytes,12,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // ActiveTopology task ID for lifecycle management
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
Server string `protobuf:"bytes,2,opt,name=server,proto3" json:"server,omitempty"` Server string `protobuf:"bytes,2,opt,name=server,proto3" json:"server,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"` Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"`
@@ -854,6 +855,13 @@ func (*TaskParams) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{8} return file_worker_proto_rawDescGZIP(), []int{8}
} }
func (x *TaskParams) GetTaskId() string {
if x != nil {
return x.TaskId
}
return ""
}
func (x *TaskParams) GetVolumeId() uint32 { func (x *TaskParams) GetVolumeId() uint32 {
if x != nil { if x != nil {
return x.VolumeId return x.VolumeId
@@ -2869,9 +2877,10 @@ const file_worker_proto_rawDesc = "" +
"\bmetadata\x18\x06 \x03(\v2'.worker_pb.TaskAssignment.MetadataEntryR\bmetadata\x1a;\n" + "\bmetadata\x18\x06 \x03(\v2'.worker_pb.TaskAssignment.MetadataEntryR\bmetadata\x1a;\n" +
"\rMetadataEntry\x12\x10\n" + "\rMetadataEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9a\x04\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xb3\x04\n" +
"\n" + "\n" +
"TaskParams\x12\x1b\n" + "TaskParams\x12\x17\n" +
"\atask_id\x18\f \x01(\tR\x06taskId\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x16\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x16\n" +
"\x06server\x18\x02 \x01(\tR\x06server\x12\x1e\n" + "\x06server\x18\x02 \x01(\tR\x06server\x12\x1e\n" +
"\n" + "\n" +

View File

@@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
task := &types.TaskDetectionResult{ task := &types.TaskDetectionResult{
TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeBalance, TaskType: types.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID, VolumeID: selectedVolume.VolumeID,
Server: selectedVolume.Server, Server: selectedVolume.Server,
@@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Create typed parameters with destination information // Create typed parameters with destination information
task.TypedParams = &worker_pb.TaskParams{ task.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: selectedVolume.VolumeID, VolumeId: selectedVolume.VolumeID,
Server: selectedVolume.Server, Server: selectedVolume.Server,
Collection: selectedVolume.Collection, Collection: selectedVolume.Collection,
@@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)", glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)",
selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
// Add pending balance task to ActiveTopology for capacity management
// Find the actual disk containing the volume on the source server
sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
if !found {
return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
}
targetDisk := destinationPlan.TargetDisk
err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
VolumeSize: int64(selectedVolume.Size),
Sources: []topology.TaskSourceSpec{
{ServerID: selectedVolume.Server, DiskID: sourceDisk},
},
Destinations: []topology.TaskDestinationSpec{
{ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
},
})
if err != nil {
return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
}
glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
} else { } else {
glog.Warningf("No ActiveTopology available for destination planning in balance detection") glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil, nil return nil, nil

View File

@@ -0,0 +1,36 @@
package base
import (
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
)
// FindVolumeDisk finds the disk ID where a specific volume is located on a given server.
// Returns the disk ID and a boolean indicating whether the volume was found.
// Uses O(1) indexed lookup for optimal performance on large clusters.
//
// This is a shared utility function used by multiple task detection algorithms
// (balance, vacuum, etc.) to locate volumes efficiently.
//
// Example usage:
//
// // In balance task: find source disk for a volume that needs to be moved
// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer)
//
// // In vacuum task: find disk containing volume that needs cleanup
// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID)
func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) {
if activeTopology == nil {
return 0, false
}
// Use the new O(1) indexed lookup for better performance
locations := activeTopology.GetVolumeLocations(volumeID, collection)
for _, loc := range locations {
if loc.ServerID == serverID {
return loc.DiskID, true
}
}
// Volume not found on this server
return 0, false
}

View File

@@ -61,7 +61,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria // Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
result := &types.TaskDetectionResult{ result := &types.TaskDetectionResult{
TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeErasureCoding, TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID, VolumeID: metric.VolumeID,
Server: metric.Server, Server: metric.Server,
@@ -81,12 +85,117 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
continue // Skip this volume if destination planning fails continue // Skip this volume if destination planning fails
} }
// Find all volume replicas from topology // Calculate expected shard size for EC operation
replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) // Each data shard will be approximately volumeSize / dataShards
expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
// Add pending EC shard task to ActiveTopology for capacity management
// Extract shard destinations from multiPlan
var shardDestinations []string
var shardDiskIDs []uint32
for _, plan := range multiPlan.Plans {
shardDestinations = append(shardDestinations, plan.TargetNode)
shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
}
// Find all volume replica locations (server + disk) from topology
replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
if len(replicaLocations) == 0 {
glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
continue
}
// Find existing EC shards from previous failed attempts
existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
// Combine volume replicas and existing EC shards for cleanup
var allSourceLocations []topology.TaskSourceLocation
// Add volume replicas (will free volume slots)
for _, replica := range replicaLocations {
allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
ServerID: replica.ServerID,
DiskID: replica.DiskID,
CleanupType: topology.CleanupVolumeReplica,
})
}
// Add existing EC shards (will free shard slots)
duplicateCheck := make(map[string]bool)
for _, replica := range replicaLocations {
key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
duplicateCheck[key] = true
}
for _, shard := range existingECShards {
key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
ServerID: shard.ServerID,
DiskID: shard.DiskID,
CleanupType: topology.CleanupECShards,
})
duplicateCheck[key] = true
}
}
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
// Convert TaskSourceLocation to TaskSourceSpec
sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
for i, srcLoc := range allSourceLocations {
sources[i] = topology.TaskSourceSpec{
ServerID: srcLoc.ServerID,
DiskID: srcLoc.DiskID,
CleanupType: srcLoc.CleanupType,
}
}
// Convert shard destinations to TaskDestinationSpec
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
shardSize := int64(expectedShardSize)
for i, dest := range shardDestinations {
destinations[i] = topology.TaskDestinationSpec{
ServerID: dest,
DiskID: shardDiskIDs[i],
StorageImpact: &shardImpact,
EstimatedSize: &shardSize,
}
}
err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
VolumeSize: int64(metric.Size),
Sources: sources,
Destinations: destinations,
})
if err != nil {
glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
continue // Skip this volume if topology task addition fails
}
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
// Find all volume replicas from topology (for legacy worker compatibility)
var replicas []string
serverSet := make(map[string]struct{})
for _, loc := range replicaLocations {
if _, found := serverSet[loc.ServerID]; !found {
replicas = append(replicas, loc.ServerID)
serverSet[loc.ServerID] = struct{}{}
}
}
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
// Create typed parameters with EC destination information and replicas // Create typed parameters with EC destination information and replicas
result.TypedParams = &worker_pb.TaskParams{ result.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: metric.VolumeID, VolumeId: metric.VolumeID,
Server: metric.Server, Server: metric.Server,
Collection: metric.Collection, Collection: metric.Collection,
@@ -143,6 +252,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// planECDestinations plans the destinations for erasure coding operation // planECDestinations plans the destinations for erasure coding operation
// This function implements EC destination planning logic directly in the detection phase // This function implements EC destination planning logic directly in the detection phase
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
// Calculate expected shard size for EC operation
expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
// Get source node information from topology // Get source node information from topology
var sourceRack, sourceDC string var sourceRack, sourceDC string
@@ -168,10 +280,12 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
} }
} }
// Get available disks for EC placement (include source node for EC) // Get available disks for EC placement with effective capacity consideration (includes pending tasks)
availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
if len(availableDisks) < erasure_coding.MinTotalDisks { if len(availableDisks) < erasure_coding.MinTotalDisks {
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks)) return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
} }
// Select best disks for EC placement with rack/DC diversity // Select best disks for EC placement with rack/DC diversity
@@ -190,7 +304,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDisk: disk.DiskID, TargetDisk: disk.DiskID,
TargetRack: disk.Rack, TargetRack: disk.Rack,
TargetDC: disk.DataCenter, TargetDC: disk.DataCenter,
ExpectedSize: 0, // EC shards don't have predetermined size ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScore(disk, sourceRack, sourceDC), PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
} }
@@ -202,6 +316,22 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
dcCount[disk.DataCenter]++ dcCount[disk.DataCenter]++
} }
// Log capacity utilization information using ActiveTopology's encapsulated logic
totalEffectiveCapacity := int64(0)
for _, plan := range plans {
effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
totalEffectiveCapacity += effectiveCapacity
}
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
// Log storage impact for EC task (source only - EC has multiple targets handled individually)
sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
return &topology.MultiDestinationPlan{ return &topology.MultiDestinationPlan{
Plans: plans, Plans: plans,
TotalShards: len(plans), TotalShards: len(plans),
@@ -354,13 +484,8 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
return false return false
} }
// Check if disk has capacity // Check if disk is not overloaded with tasks
if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount { if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
return false
}
// Check if disk is not overloaded
if disk.LoadCount > 10 { // Arbitrary threshold
return false return false
} }
@@ -380,6 +505,24 @@ func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC str
return conflicts return conflicts
} }
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
// Uses O(1) indexed lookup for optimal performance on large clusters.
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
if activeTopology == nil {
return nil
}
return activeTopology.GetVolumeLocations(volumeID, collection)
}
// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
// Uses O(1) indexed lookup for optimal performance on large clusters.
func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
if activeTopology == nil {
return nil
}
return activeTopology.GetECShardLocations(volumeID, collection)
}
// findVolumeReplicas finds all servers that have replicas of the specified volume // findVolumeReplicas finds all servers that have replicas of the specified volume
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
if activeTopology == nil { if activeTopology == nil {

View File

@@ -1,6 +1,7 @@
package vacuum package vacuum
import ( import (
"fmt"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@@ -31,7 +32,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
priority = types.TaskPriorityHigh priority = types.TaskPriorityHigh
} }
// Generate task ID for future ActiveTopology integration
taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())
result := &types.TaskDetectionResult{ result := &types.TaskDetectionResult{
TaskID: taskID, // For future ActiveTopology integration
TaskType: types.TaskTypeVacuum, TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID, VolumeID: metric.VolumeID,
Server: metric.Server, Server: metric.Server,
@@ -96,6 +101,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
// Create typed protobuf parameters // Create typed protobuf parameters
return &worker_pb.TaskParams{ return &worker_pb.TaskParams{
TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
VolumeId: task.VolumeID, VolumeId: task.VolumeID,
Server: task.Server, Server: task.Server,
Collection: task.Collection, Collection: task.Collection,

View File

@@ -73,6 +73,7 @@ type TaskParams struct {
// TaskDetectionResult represents the result of scanning for maintenance needs // TaskDetectionResult represents the result of scanning for maintenance needs
type TaskDetectionResult struct { type TaskDetectionResult struct {
TaskID string `json:"task_id"` // ActiveTopology task ID for lifecycle management
TaskType TaskType `json:"task_type"` TaskType TaskType `json:"task_type"`
VolumeID uint32 `json:"volume_id,omitempty"` VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"` Server string `json:"server,omitempty"`