This commit is contained in:
chrislu 2025-07-27 00:28:15 -07:00
parent 49338df3e2
commit 165cf79b35
26 changed files with 1468 additions and 1860 deletions

View File

@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
var config interface{}
switch taskType {
case types.TaskTypeVacuum:
config = &vacuum.VacuumConfig{}
config = &vacuum.VacuumConfigV2{}
case types.TaskTypeBalance:
config = &balance.BalanceConfig{}
config = &balance.BalanceConfigV2{}
case types.TaskTypeErasureCoding:
config = &erasure_coding.ErasureCodingConfig{}
config = &erasure_coding.ErasureCodingConfigV2{}
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return

View File

@ -1,171 +0,0 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BalanceDetector implements TaskDetector for balance tasks
type BalanceDetector struct {
enabled bool
threshold float64 // Imbalance threshold (0.1 = 10%)
minCheckInterval time.Duration
minVolumeCount int
lastCheck time.Time
}
// Compile-time interface assertions
var (
_ types.TaskDetector = (*BalanceDetector)(nil)
)
// NewBalanceDetector creates a new balance detector
func NewBalanceDetector() *BalanceDetector {
return &BalanceDetector{
enabled: true,
threshold: 0.1, // 10% imbalance threshold
minCheckInterval: 1 * time.Hour,
minVolumeCount: 10, // Don't balance small clusters
lastCheck: time.Time{},
}
}
// GetTaskType returns the task type
func (d *BalanceDetector) GetTaskType() types.TaskType {
return types.TaskTypeBalance
}
// ScanForTasks checks if cluster balance is needed
func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
if !d.enabled {
return nil, nil
}
glog.V(2).Infof("Scanning for balance tasks...")
// Don't check too frequently
if time.Since(d.lastCheck) < d.minCheckInterval {
return nil, nil
}
d.lastCheck = time.Now()
// Skip if cluster is too small
if len(volumeMetrics) < d.minVolumeCount {
glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount)
return nil, nil
}
// Analyze volume distribution across servers
serverVolumeCounts := make(map[string]int)
for _, metric := range volumeMetrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < 2 {
glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts))
return nil, nil
}
// Calculate balance metrics
totalVolumes := len(volumeMetrics)
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
if imbalanceRatio <= d.threshold {
glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold)
return nil, nil
}
// Create balance task
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
task := &types.TaskDetectionResult{
TaskType: types.TaskTypeBalance,
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
Parameters: map[string]interface{}{
"imbalance_ratio": imbalanceRatio,
"threshold": d.threshold,
"max_volumes": maxVolumes,
"min_volumes": minVolumes,
"avg_volumes_per_server": avgVolumesPerServer,
"max_server": maxServer,
"min_server": minServer,
"total_servers": len(serverVolumeCounts),
},
}
glog.V(1).Infof("🔄 Found balance task: %s", reason)
return []*types.TaskDetectionResult{task}, nil
}
// ScanInterval returns how often to scan
func (d *BalanceDetector) ScanInterval() time.Duration {
return d.minCheckInterval
}
// IsEnabled returns whether the detector is enabled
func (d *BalanceDetector) IsEnabled() bool {
return d.enabled
}
// SetEnabled sets whether the detector is enabled
func (d *BalanceDetector) SetEnabled(enabled bool) {
d.enabled = enabled
glog.V(1).Infof("🔄 Balance detector enabled: %v", enabled)
}
// SetThreshold sets the imbalance threshold
func (d *BalanceDetector) SetThreshold(threshold float64) {
d.threshold = threshold
glog.V(1).Infof("🔄 Balance threshold set to: %.1f%%", threshold*100)
}
// SetMinCheckInterval sets the minimum time between balance checks
func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) {
d.minCheckInterval = interval
glog.V(1).Infof("🔄 Balance check interval set to: %v", interval)
}
// SetMinVolumeCount sets the minimum volume count for balance operations
func (d *BalanceDetector) SetMinVolumeCount(count int) {
d.minVolumeCount = count
glog.V(1).Infof("🔄 Balance minimum volume count set to: %d", count)
}
// GetThreshold returns the current imbalance threshold
func (d *BalanceDetector) GetThreshold() float64 {
return d.threshold
}
// GetMinCheckInterval returns the minimum check interval
func (d *BalanceDetector) GetMinCheckInterval() time.Duration {
return d.minCheckInterval
}
// GetMinVolumeCount returns the minimum volume count
func (d *BalanceDetector) GetMinVolumeCount() int {
return d.minVolumeCount
}

View File

@ -1,81 +1,7 @@
package balance
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Factory creates balance task instances
type Factory struct {
*tasks.BaseTaskFactory
}
// NewFactory creates a new balance task factory
func NewFactory() *Factory {
return &Factory{
BaseTaskFactory: tasks.NewBaseTaskFactory(
types.TaskTypeBalance,
[]string{"balance", "storage", "optimization"},
"Balance data across volume servers for optimal performance",
),
}
}
// Create creates a new balance task instance
func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
task := NewTask(params.Server, params.VolumeID, params.Collection)
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// Shared detector and scheduler instances
var (
sharedDetector *BalanceDetector
sharedScheduler *BalanceScheduler
)
// getSharedInstances returns the shared detector and scheduler instances
func getSharedInstances() (*BalanceDetector, *BalanceScheduler) {
if sharedDetector == nil {
sharedDetector = NewBalanceDetector()
}
if sharedScheduler == nil {
sharedScheduler = NewBalanceScheduler()
}
return sharedDetector, sharedScheduler
}
// GetSharedInstances returns the shared detector and scheduler instances (public access)
func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) {
return getSharedInstances()
}
// Auto-register this task when the package is imported
func init() {
factory := NewFactory()
tasks.AutoRegister(types.TaskTypeBalance, factory)
// Get shared instances for all registrations
detector, scheduler := getSharedInstances()
// Register with types registry
tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
registry.RegisterTask(detector, scheduler)
})
// Register with UI registry using the same instances
tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
RegisterUI(uiRegistry, detector, scheduler)
})
// Use new architecture instead of old registration
initBalanceV2()
}

View File

@ -1,197 +0,0 @@
package balance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BalanceScheduler implements TaskScheduler for balance tasks
type BalanceScheduler struct {
enabled bool
maxConcurrent int
minInterval time.Duration
lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type
minServerCount int
moveDuringOffHours bool
offHoursStart string
offHoursEnd string
}
// Compile-time interface assertions
var (
_ types.TaskScheduler = (*BalanceScheduler)(nil)
)
// NewBalanceScheduler creates a new balance scheduler
func NewBalanceScheduler() *BalanceScheduler {
return &BalanceScheduler{
enabled: true,
maxConcurrent: 1, // Only run one balance at a time
minInterval: 6 * time.Hour,
lastScheduled: make(map[string]time.Time),
minServerCount: 3,
moveDuringOffHours: true,
offHoursStart: "23:00",
offHoursEnd: "06:00",
}
}
// GetTaskType returns the task type
func (s *BalanceScheduler) GetTaskType() types.TaskType {
return types.TaskTypeBalance
}
// CanScheduleNow determines if a balance task can be scheduled
func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
if !s.enabled {
return false
}
// Count running balance tasks
runningBalanceCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeBalance {
runningBalanceCount++
}
}
// Check concurrency limit
if runningBalanceCount >= s.maxConcurrent {
glog.V(3).Infof("⏸️ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent)
return false
}
// Check minimum interval between balance operations
if lastTime, exists := s.lastScheduled["balance"]; exists {
if time.Since(lastTime) < s.minInterval {
timeLeft := s.minInterval - time.Since(lastTime)
glog.V(3).Infof("⏸️ Balance task blocked: too soon (wait %v)", timeLeft)
return false
}
}
// Check if we have available workers
availableWorkerCount := 0
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeBalance {
availableWorkerCount++
break
}
}
}
if availableWorkerCount == 0 {
glog.V(3).Infof("⏸️ Balance task blocked: no available workers")
return false
}
// All checks passed - can schedule
s.lastScheduled["balance"] = time.Now()
glog.V(2).Infof("✅ Balance task can be scheduled (running: %d/%d, workers: %d)",
runningBalanceCount, s.maxConcurrent, availableWorkerCount)
return true
}
// GetPriority returns the priority for balance tasks
func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority {
// Balance is typically normal priority - not urgent but important for optimization
return types.TaskPriorityNormal
}
// GetMaxConcurrent returns the maximum concurrent balance tasks
func (s *BalanceScheduler) GetMaxConcurrent() int {
return s.maxConcurrent
}
// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks
func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration {
return s.minInterval
}
// IsEnabled returns whether the scheduler is enabled
func (s *BalanceScheduler) IsEnabled() bool {
return s.enabled
}
// SetEnabled sets whether the scheduler is enabled
func (s *BalanceScheduler) SetEnabled(enabled bool) {
s.enabled = enabled
glog.V(1).Infof("🔄 Balance scheduler enabled: %v", enabled)
}
// SetMaxConcurrent sets the maximum concurrent balance tasks
func (s *BalanceScheduler) SetMaxConcurrent(max int) {
s.maxConcurrent = max
glog.V(1).Infof("🔄 Balance max concurrent set to: %d", max)
}
// SetMinInterval sets the minimum interval between balance operations
func (s *BalanceScheduler) SetMinInterval(interval time.Duration) {
s.minInterval = interval
glog.V(1).Infof("🔄 Balance minimum interval set to: %v", interval)
}
// GetLastScheduled returns when we last scheduled this task type
func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time {
if lastTime, exists := s.lastScheduled[taskKey]; exists {
return lastTime
}
return time.Time{}
}
// SetLastScheduled updates when we last scheduled this task type
func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) {
s.lastScheduled[taskKey] = when
}
// GetMinServerCount returns the minimum server count
func (s *BalanceScheduler) GetMinServerCount() int {
return s.minServerCount
}
// SetMinServerCount sets the minimum server count
func (s *BalanceScheduler) SetMinServerCount(count int) {
s.minServerCount = count
glog.V(1).Infof("🔄 Balance minimum server count set to: %d", count)
}
// GetMoveDuringOffHours returns whether to move only during off-hours
func (s *BalanceScheduler) GetMoveDuringOffHours() bool {
return s.moveDuringOffHours
}
// SetMoveDuringOffHours sets whether to move only during off-hours
func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) {
s.moveDuringOffHours = enabled
glog.V(1).Infof("🔄 Balance move during off-hours: %v", enabled)
}
// GetOffHoursStart returns the off-hours start time
func (s *BalanceScheduler) GetOffHoursStart() string {
return s.offHoursStart
}
// SetOffHoursStart sets the off-hours start time
func (s *BalanceScheduler) SetOffHoursStart(start string) {
s.offHoursStart = start
glog.V(1).Infof("🔄 Balance off-hours start time set to: %s", start)
}
// GetOffHoursEnd returns the off-hours end time
func (s *BalanceScheduler) GetOffHoursEnd() string {
return s.offHoursEnd
}
// SetOffHoursEnd sets the off-hours end time
func (s *BalanceScheduler) SetOffHoursEnd(end string) {
s.offHoursEnd = end
glog.V(1).Infof("🔄 Balance off-hours end time set to: %s", end)
}
// GetMinInterval returns the minimum interval
func (s *BalanceScheduler) GetMinInterval() time.Duration {
return s.minInterval
}

View File

@ -0,0 +1,274 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BalanceConfigV2 extends BaseConfig with balance-specific settings
type BalanceConfigV2 struct {
base.BaseConfig
ImbalanceThreshold float64 `json:"imbalance_threshold"`
MinServerCount int `json:"min_server_count"`
}
// ToMap converts config to map (extend base functionality)
func (c *BalanceConfigV2) ToMap() map[string]interface{} {
result := c.BaseConfig.ToMap()
result["imbalance_threshold"] = c.ImbalanceThreshold
result["min_server_count"] = c.MinServerCount
return result
}
// FromMap loads config from map (extend base functionality)
func (c *BalanceConfigV2) FromMap(data map[string]interface{}) error {
// Load base config first
if err := c.BaseConfig.FromMap(data); err != nil {
return err
}
// Load balance-specific config
if threshold, ok := data["imbalance_threshold"].(float64); ok {
c.ImbalanceThreshold = threshold
}
if serverCount, ok := data["min_server_count"].(int); ok {
c.MinServerCount = serverCount
}
return nil
}
// balanceDetection implements the detection logic for balance tasks
func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
balanceConfig := config.(*BalanceConfigV2)
// Skip if cluster is too small
minVolumeCount := 10
if len(metrics) < minVolumeCount {
return nil, nil
}
// Analyze volume distribution across servers
serverVolumeCounts := make(map[string]int)
for _, metric := range metrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
return nil, nil
}
// Calculate balance metrics
totalVolumes := len(metrics)
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
return nil, nil
}
// Create balance task
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
task := &types.TaskDetectionResult{
TaskType: types.TaskTypeBalance,
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
Parameters: map[string]interface{}{
"imbalance_ratio": imbalanceRatio,
"threshold": balanceConfig.ImbalanceThreshold,
"max_volumes": maxVolumes,
"min_volumes": minVolumes,
"avg_volumes_per_server": avgVolumesPerServer,
"max_server": maxServer,
"min_server": minServer,
"total_servers": len(serverVolumeCounts),
},
}
return []*types.TaskDetectionResult{task}, nil
}
// balanceScheduling implements the scheduling logic for balance tasks
func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
balanceConfig := config.(*BalanceConfigV2)
// Count running balance tasks
runningBalanceCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeBalance {
runningBalanceCount++
}
}
// Check concurrency limit
if runningBalanceCount >= balanceConfig.MaxConcurrent {
return false
}
// Check if we have available workers
availableWorkerCount := 0
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeBalance {
availableWorkerCount++
break
}
}
}
return availableWorkerCount > 0
}
// createBalanceTask creates a balance task instance
func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
task := NewTask(params.Server, params.VolumeID, params.Collection)
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// getBalanceConfigSpec returns the configuration schema for balance tasks
func getBalanceConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Balance Tasks",
Description: "Whether balance tasks should be automatically created",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "imbalance_threshold",
JSONName: "imbalance_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.1, // 10%
MinValue: 0.01,
MaxValue: 0.5,
Required: true,
DisplayName: "Imbalance Threshold",
Description: "Trigger balance when storage imbalance exceeds this ratio",
Placeholder: "0.10 (10%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 6 * 60 * 60, // 6 hours
MinValue: 1 * 60 * 60, // 1 hour
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for imbalanced volumes",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 5,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of balance tasks that can run simultaneously",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_server_count",
JSONName: "min_server_count",
Type: config.FieldTypeInt,
DefaultValue: 3,
MinValue: 2,
MaxValue: 20,
Required: true,
DisplayName: "Minimum Server Count",
Description: "Only balance when at least this many servers are available",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}
// initBalanceV2 registers the refactored balance task
func initBalanceV2() {
// Create configuration instance
config := &BalanceConfigV2{
BaseConfig: base.BaseConfig{
Enabled: false, // Conservative default
ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
MaxConcurrent: 2,
},
ImbalanceThreshold: 0.1, // 10%
MinServerCount: 3,
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeBalance,
Name: "balance",
DisplayName: "Volume Balance",
Description: "Redistributes volumes across volume servers to optimize storage utilization",
Icon: "fas fa-balance-scale text-secondary",
Capabilities: []string{"balance", "storage", "optimization"},
Config: config,
ConfigSpec: getBalanceConfigSpec(),
CreateTask: createBalanceTask,
DetectionFunc: balanceDetection,
ScanInterval: 6 * time.Hour,
SchedulingFunc: balanceScheduling,
MaxConcurrent: 2,
RepeatInterval: 12 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

View File

@ -1,88 +0,0 @@
package balance
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// GetConfigSchema returns the schema for balance task configuration
func GetConfigSchema() *tasks.TaskConfigSchema {
return &tasks.TaskConfigSchema{
TaskName: "balance",
DisplayName: "Volume Balance",
Description: "Redistributes volumes across volume servers to optimize storage utilization",
Icon: "fas fa-balance-scale text-secondary",
Schema: config.Schema{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Balance Tasks",
Description: "Whether balance tasks should be automatically created",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "imbalance_threshold",
JSONName: "imbalance_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.1, // 10%
MinValue: 0.01,
MaxValue: 0.5,
Required: true,
DisplayName: "Imbalance Threshold",
Description: "Trigger balance when storage imbalance exceeds this ratio",
Placeholder: "0.10 (10%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 6 * 60 * 60, // 6 hours
MinValue: 1 * 60 * 60, // 1 hour
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for imbalanced volumes",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 5,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of balance tasks that can run simultaneously",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_server_count",
JSONName: "min_server_count",
Type: config.FieldTypeInt,
DefaultValue: 3,
MinValue: 2,
MaxValue: 20,
Required: true,
DisplayName: "Minimum Server Count",
Description: "Only balance when at least this many servers are available",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
},
}
}

View File

@ -1,18 +0,0 @@
package balance
import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// SchemaProvider implements the TaskConfigSchemaProvider interface for balance tasks
type SchemaProvider struct{}
// GetConfigSchema returns the schema for balance task configuration
func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
return GetConfigSchema()
}
// init registers the balance schema provider
func init() {
tasks.RegisterTaskConfigSchema("balance", &SchemaProvider{})
}

View File

@ -1,115 +0,0 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BalanceConfig represents the balance configuration matching the schema
type BalanceConfig struct {
Enabled bool `json:"enabled"`
ImbalanceThreshold float64 `json:"imbalance_threshold"`
ScanIntervalSeconds int `json:"scan_interval_seconds"`
MaxConcurrent int `json:"max_concurrent"`
MinServerCount int `json:"min_server_count"`
}
// BalanceUILogic contains the business logic for balance UI operations
type BalanceUILogic struct {
detector *BalanceDetector
scheduler *BalanceScheduler
}
// NewBalanceUILogic creates new balance UI logic
func NewBalanceUILogic(detector *BalanceDetector, scheduler *BalanceScheduler) *BalanceUILogic {
return &BalanceUILogic{
detector: detector,
scheduler: scheduler,
}
}
// GetCurrentConfig returns the current balance configuration
func (logic *BalanceUILogic) GetCurrentConfig() interface{} {
config := &BalanceConfig{
// Default values from schema (matching task_config_schema.go)
Enabled: true,
ImbalanceThreshold: 0.1, // 10%
ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
MaxConcurrent: 2,
MinServerCount: 3,
}
// Get current values from detector
if logic.detector != nil {
config.Enabled = logic.detector.IsEnabled()
config.ImbalanceThreshold = logic.detector.GetThreshold()
config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
}
// Get current values from scheduler
if logic.scheduler != nil {
config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
config.MinServerCount = logic.scheduler.GetMinServerCount()
}
return config
}
// ApplyConfig applies the balance configuration
func (logic *BalanceUILogic) ApplyConfig(config interface{}) error {
balanceConfig, ok := config.(*BalanceConfig)
if !ok {
return fmt.Errorf("invalid configuration type for balance")
}
// Apply to detector
if logic.detector != nil {
logic.detector.SetEnabled(balanceConfig.Enabled)
logic.detector.SetThreshold(balanceConfig.ImbalanceThreshold)
logic.detector.SetMinCheckInterval(time.Duration(balanceConfig.ScanIntervalSeconds) * time.Second)
}
// Apply to scheduler
if logic.scheduler != nil {
logic.scheduler.SetEnabled(balanceConfig.Enabled)
logic.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent)
logic.scheduler.SetMinServerCount(balanceConfig.MinServerCount)
}
glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d",
balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent,
balanceConfig.MinServerCount)
return nil
}
// RegisterUI registers the balance UI provider with the UI registry
func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) {
logic := NewBalanceUILogic(detector, scheduler)
tasks.CommonRegisterUI(
types.TaskTypeBalance,
"Volume Balance",
uiRegistry,
detector,
scheduler,
GetConfigSchema,
logic.GetCurrentConfig,
logic.ApplyConfig,
)
}
// DefaultBalanceConfig returns default balance configuration
func DefaultBalanceConfig() *BalanceConfig {
return &BalanceConfig{
Enabled: false,
ImbalanceThreshold: 0.1, // 10%
ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
MaxConcurrent: 2,
MinServerCount: 3,
}
}

View File

@ -0,0 +1,129 @@
package base
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// GenericDetector implements TaskDetector using function-based logic
type GenericDetector struct {
taskDef *TaskDefinition
}
// NewGenericDetector creates a detector from a task definition
func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
return &GenericDetector{taskDef: taskDef}
}
// GetTaskType returns the task type
func (d *GenericDetector) GetTaskType() types.TaskType {
return d.taskDef.Type
}
// ScanForTasks scans using the task definition's detection function
func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
if d.taskDef.DetectionFunc == nil {
return nil, nil
}
return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
}
// ScanInterval returns the scan interval from task definition
func (d *GenericDetector) ScanInterval() time.Duration {
if d.taskDef.ScanInterval > 0 {
return d.taskDef.ScanInterval
}
return 30 * time.Minute // Default
}
// IsEnabled returns whether this detector is enabled
func (d *GenericDetector) IsEnabled() bool {
return d.taskDef.Config.IsEnabled()
}
// GenericScheduler implements TaskScheduler using function-based logic
type GenericScheduler struct {
taskDef *TaskDefinition
}
// NewGenericScheduler creates a scheduler from a task definition
func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
return &GenericScheduler{taskDef: taskDef}
}
// GetTaskType returns the task type
func (s *GenericScheduler) GetTaskType() types.TaskType {
return s.taskDef.Type
}
// CanScheduleNow determines if a task can be scheduled using the task definition's function
func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
if s.taskDef.SchedulingFunc == nil {
return s.defaultCanSchedule(task, runningTasks, availableWorkers)
}
return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
}
// defaultCanSchedule provides default scheduling logic
func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
if !s.taskDef.Config.IsEnabled() {
return false
}
// Count running tasks of this type
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == s.taskDef.Type {
runningCount++
}
}
// Check concurrency limit
maxConcurrent := s.taskDef.MaxConcurrent
if maxConcurrent <= 0 {
maxConcurrent = 1 // Default
}
if runningCount >= maxConcurrent {
return false
}
// Check if we have available workers
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == s.taskDef.Type {
return true
}
}
}
}
return false
}
// GetPriority returns the priority for this task
func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
return task.Priority
}
// GetMaxConcurrent returns max concurrent tasks
func (s *GenericScheduler) GetMaxConcurrent() int {
if s.taskDef.MaxConcurrent > 0 {
return s.taskDef.MaxConcurrent
}
return 1 // Default
}
// GetDefaultRepeatInterval returns the default repeat interval
func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
if s.taskDef.RepeatInterval > 0 {
return s.taskDef.RepeatInterval
}
return 24 * time.Hour // Default
}
// IsEnabled returns whether this scheduler is enabled
func (s *GenericScheduler) IsEnabled() bool {
return s.taskDef.Config.IsEnabled()
}

View File

@ -0,0 +1,131 @@
package base
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// GenericFactory creates task instances using a TaskDefinition
type GenericFactory struct {
*tasks.BaseTaskFactory
taskDef *TaskDefinition
}
// NewGenericFactory creates a generic task factory
func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory {
return &GenericFactory{
BaseTaskFactory: tasks.NewBaseTaskFactory(
taskDef.Type,
taskDef.Capabilities,
taskDef.Description,
),
taskDef: taskDef,
}
}
// Create creates a task instance using the task definition
func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) {
if f.taskDef.CreateTask == nil {
return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type)
}
return f.taskDef.CreateTask(params)
}
// GenericSchemaProvider provides config schema from TaskDefinition
type GenericSchemaProvider struct {
taskDef *TaskDefinition
}
// GetConfigSchema returns the schema from task definition
func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
return &tasks.TaskConfigSchema{
TaskName: string(p.taskDef.Type),
DisplayName: p.taskDef.DisplayName,
Description: p.taskDef.Description,
Icon: p.taskDef.Icon,
Schema: config.Schema{
Fields: p.taskDef.ConfigSpec.Fields,
},
}
}
// GenericUIProvider provides UI functionality from TaskDefinition
type GenericUIProvider struct {
taskDef *TaskDefinition
}
// GetCurrentConfig returns current config as interface{}
func (ui *GenericUIProvider) GetCurrentConfig() interface{} {
return ui.taskDef.Config
}
// ApplyConfig applies configuration
func (ui *GenericUIProvider) ApplyConfig(config interface{}) error {
if configMap, ok := config.(map[string]interface{}); ok {
return ui.taskDef.Config.FromMap(configMap)
}
return fmt.Errorf("invalid config format for %s", ui.taskDef.Type)
}
// RegisterTask registers a complete task definition with all registries
func RegisterTask(taskDef *TaskDefinition) {
// Validate task definition
if err := validateTaskDefinition(taskDef); err != nil {
glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err)
return
}
// Create and register factory
factory := NewGenericFactory(taskDef)
tasks.AutoRegister(taskDef.Type, factory)
// Create and register detector/scheduler
detector := NewGenericDetector(taskDef)
scheduler := NewGenericScheduler(taskDef)
tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
registry.RegisterTask(detector, scheduler)
})
// Create and register schema provider
schemaProvider := &GenericSchemaProvider{taskDef: taskDef}
tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider)
// Create and register UI provider
uiProvider := &GenericUIProvider{taskDef: taskDef}
tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
baseUIProvider := tasks.NewBaseUIProvider(
taskDef.Type,
taskDef.DisplayName,
taskDef.Description,
taskDef.Icon,
schemaProvider.GetConfigSchema,
uiProvider.GetCurrentConfig,
uiProvider.ApplyConfig,
)
uiRegistry.RegisterUI(baseUIProvider)
})
glog.V(1).Infof("✅ Registered complete task definition: %s", taskDef.Type)
}
// validateTaskDefinition ensures the task definition is complete
func validateTaskDefinition(taskDef *TaskDefinition) error {
if taskDef.Type == "" {
return fmt.Errorf("task type is required")
}
if taskDef.Name == "" {
return fmt.Errorf("task name is required")
}
if taskDef.Config == nil {
return fmt.Errorf("task config is required")
}
if taskDef.CreateTask == nil {
return fmt.Errorf("task creation function is required")
}
return nil
}

View File

@ -0,0 +1,95 @@
package base
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TaskDefinition encapsulates everything needed to define a complete task type
type TaskDefinition struct {
// Basic task information
Type types.TaskType
Name string
DisplayName string
Description string
Icon string
Capabilities []string
// Task configuration
Config TaskConfig
ConfigSpec ConfigSpec
// Task creation
CreateTask func(params types.TaskParams) (types.TaskInterface, error)
// Detection logic
DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
ScanInterval time.Duration
// Scheduling logic
SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool
MaxConcurrent int
RepeatInterval time.Duration
}
// TaskConfig provides a simple configuration interface
type TaskConfig interface {
IsEnabled() bool
SetEnabled(bool)
Validate() error
ToMap() map[string]interface{}
FromMap(map[string]interface{}) error
}
// ConfigSpec defines the configuration schema
type ConfigSpec struct {
Fields []*config.Field
}
// BaseConfig provides common configuration fields
type BaseConfig struct {
Enabled bool `json:"enabled"`
ScanIntervalSeconds int `json:"scan_interval_seconds"`
MaxConcurrent int `json:"max_concurrent"`
}
// IsEnabled returns whether the task is enabled
func (c *BaseConfig) IsEnabled() bool {
return c.Enabled
}
// SetEnabled sets whether the task is enabled
func (c *BaseConfig) SetEnabled(enabled bool) {
c.Enabled = enabled
}
// Validate validates the base configuration
func (c *BaseConfig) Validate() error {
// Common validation logic
return nil
}
// ToMap converts config to map
func (c *BaseConfig) ToMap() map[string]interface{} {
return map[string]interface{}{
"enabled": c.Enabled,
"scan_interval_seconds": c.ScanIntervalSeconds,
"max_concurrent": c.MaxConcurrent,
}
}
// FromMap loads config from map
func (c *BaseConfig) FromMap(data map[string]interface{}) error {
if enabled, ok := data["enabled"].(bool); ok {
c.Enabled = enabled
}
if interval, ok := data["scan_interval_seconds"].(int); ok {
c.ScanIntervalSeconds = interval
}
if concurrent, ok := data["max_concurrent"].(int); ok {
c.MaxConcurrent = concurrent
}
return nil
}

View File

@ -1,100 +0,0 @@
package erasure_coding
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// GetConfigSchema returns the schema for erasure coding task configuration
func GetConfigSchema() *tasks.TaskConfigSchema {
return &tasks.TaskConfigSchema{
TaskName: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Converts volumes to erasure coded format for improved data durability",
Icon: "fas fa-shield-alt text-info",
Schema: config.Schema{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Erasure Coding Tasks",
Description: "Whether erasure coding tasks should be automatically created",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60, // 7 days
MinValue: 1 * 24 * 60 * 60, // 1 day
MaxValue: 30 * 24 * 60 * 60, // 30 days
Required: true,
DisplayName: "Quiet For Duration",
Description: "Only apply erasure coding to volumes that have not been modified for this duration",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 12 * 60 * 60, // 12 hours
MinValue: 2 * 60 * 60, // 2 hours
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing erasure coding",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of erasure coding tasks that can run simultaneously",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.9, // 90%
MinValue: 0.5,
MaxValue: 1.0,
Required: true,
DisplayName: "Fullness Ratio",
Description: "Only apply erasure coding to volumes with fullness ratio above this threshold",
Placeholder: "0.90 (90%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)",
Placeholder: "collection1,collection2",
InputType: "text",
CSSClasses: "form-control",
},
},
},
}
}

View File

@ -1,226 +0,0 @@
package erasure_coding
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// EcDetector implements erasure coding task detection
type EcDetector struct {
enabled bool
quietForSeconds int
fullnessRatio float64
minSizeMB int // Minimum volume size in MB before considering EC
scanInterval time.Duration
collectionFilter string
}
// Compile-time interface assertions
var (
_ types.TaskDetector = (*EcDetector)(nil)
_ types.PolicyConfigurableDetector = (*EcDetector)(nil)
)
// NewEcDetector creates a new erasure coding detector with production defaults
func NewEcDetector() *EcDetector {
return &EcDetector{
enabled: false, // Conservative default - enable via configuration
quietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period
fullnessRatio: 0.90, // 90% full threshold
minSizeMB: 100, // Minimum 100MB volume size
scanInterval: 12 * time.Hour, // Scan every 12 hours
collectionFilter: "", // No collection filter by default
}
}
// GetTaskType returns the task type
func (d *EcDetector) GetTaskType() types.TaskType {
return types.TaskTypeErasureCoding
}
// ScanForTasks scans for volumes that should be converted to erasure coding
func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
if !d.enabled {
glog.V(2).Infof("EC detector is disabled")
return nil, nil
}
var results []*types.TaskDetectionResult
now := time.Now()
quietThreshold := time.Duration(d.quietForSeconds) * time.Second
minSizeBytes := uint64(d.minSizeMB) * 1024 * 1024
glog.V(2).Infof("EC detector scanning %d volumes with thresholds: quietFor=%ds, fullness=%.2f, minSize=%dMB",
len(volumeMetrics), d.quietForSeconds, d.fullnessRatio, d.minSizeMB)
for _, metric := range volumeMetrics {
// Skip if already EC volume
if metric.IsECVolume {
continue
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
continue
}
// Check collection filter if specified
if d.collectionFilter != "" {
// Parse comma-separated collections
allowedCollections := make(map[string]bool)
for _, collection := range strings.Split(d.collectionFilter, ",") {
allowedCollections[strings.TrimSpace(collection)] = true
}
// Skip if volume's collection is not in the allowed list
if !allowedCollections[metric.Collection] {
continue
}
}
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= d.fullnessRatio {
// Note: Removed read-only requirement for testing
// In production, you might want to enable this:
// if !metric.IsReadOnly {
// continue
// }
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
metric.Age.Seconds(), d.quietForSeconds, metric.FullnessRatio*100, d.fullnessRatio*100,
float64(metric.Size)/(1024*1024), d.minSizeMB),
Parameters: map[string]interface{}{
"age_seconds": int(metric.Age.Seconds()),
"fullness_ratio": metric.FullnessRatio,
"size_mb": int(metric.Size / (1024 * 1024)),
},
ScheduleAt: now,
}
results = append(results, result)
glog.V(1).Infof("EC task detected for volume %d on %s: %s", metric.VolumeID, metric.Server, result.Reason)
}
}
glog.V(1).Infof("EC detector found %d tasks to schedule", len(results))
return results, nil
}
// ScanInterval returns how often this task type should be scanned
func (d *EcDetector) ScanInterval() time.Duration {
return d.scanInterval
}
// IsEnabled returns whether this task type is enabled
func (d *EcDetector) IsEnabled() bool {
return d.enabled
}
// Configuration methods for runtime configuration
// Configure sets detector configuration from policy
func (d *EcDetector) Configure(config map[string]interface{}) error {
if enabled, ok := config["enabled"].(bool); ok {
d.enabled = enabled
}
if ageSeconds, ok := config["quiet_for_seconds"].(float64); ok {
d.quietForSeconds = int(ageSeconds)
}
if fullnessRatio, ok := config["fullness_ratio"].(float64); ok {
d.fullnessRatio = fullnessRatio
}
if minSizeMB, ok := config["min_size_mb"].(float64); ok {
d.minSizeMB = int(minSizeMB)
}
if collectionFilter, ok := config["collection_filter"].(string); ok {
d.collectionFilter = collectionFilter
}
glog.V(1).Infof("EC detector configured: enabled=%v, quietFor=%ds, fullness=%.2f, minSize=%dMB, collection_filter='%s'",
d.enabled, d.quietForSeconds, d.fullnessRatio, d.minSizeMB, d.collectionFilter)
return nil
}
// SetEnabled sets whether the detector is enabled
func (d *EcDetector) SetEnabled(enabled bool) {
d.enabled = enabled
}
// SetQuietForSeconds sets the quiet duration threshold in seconds
func (d *EcDetector) SetQuietForSeconds(seconds int) {
d.quietForSeconds = seconds
}
// SetFullnessRatio sets the fullness ratio threshold
func (d *EcDetector) SetFullnessRatio(ratio float64) {
d.fullnessRatio = ratio
}
// SetCollectionFilter sets the collection filter
func (d *EcDetector) SetCollectionFilter(filter string) {
d.collectionFilter = filter
}
// SetScanInterval sets the scan interval
func (d *EcDetector) SetScanInterval(interval time.Duration) {
d.scanInterval = interval
}
// GetQuietForSeconds returns the current quiet duration threshold in seconds
func (d *EcDetector) GetQuietForSeconds() int {
return d.quietForSeconds
}
// GetFullnessRatio returns the current fullness ratio threshold
func (d *EcDetector) GetFullnessRatio() float64 {
return d.fullnessRatio
}
// GetCollectionFilter returns the current collection filter
func (d *EcDetector) GetCollectionFilter() string {
return d.collectionFilter
}
// GetScanInterval returns the scan interval
func (d *EcDetector) GetScanInterval() time.Duration {
return d.scanInterval
}
// ConfigureFromPolicy configures the detector from maintenance policy
func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
// Cast policy to maintenance policy type
if maintenancePolicy, ok := policy.(*types.MaintenancePolicy); ok {
// Get EC-specific configuration from policy
ecConfig := maintenancePolicy.GetTaskConfig(types.TaskTypeErasureCoding)
if ecConfig != nil {
// Convert to map for easier access
if configMap, ok := ecConfig.(map[string]interface{}); ok {
d.Configure(configMap)
} else {
glog.Warningf("EC detector policy configuration is not a map: %T", ecConfig)
}
} else {
// No specific configuration found, use defaults with policy-based enabled status
enabled := maintenancePolicy.GlobalSettings != nil && maintenancePolicy.GlobalSettings.MaintenanceEnabled
glog.V(2).Infof("No EC-specific config found, using default with enabled=%v", enabled)
d.enabled = enabled
}
} else {
glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy)
}
}

View File

@ -1,90 +1,7 @@
package erasure_coding
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Factory creates erasure coding task instances
type Factory struct {
*tasks.BaseTaskFactory
}
// NewFactory creates a new erasure coding task factory
func NewFactory() *Factory {
return &Factory{
BaseTaskFactory: tasks.NewBaseTaskFactory(
types.TaskTypeErasureCoding,
[]string{"erasure_coding", "storage", "durability"},
"Convert volumes to erasure coded format for improved durability",
),
}
}
// Create creates a new erasure coding task instance
func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
// Extract additional parameters for comprehensive EC
masterClient := "localhost:9333" // Default master client
workDir := "/tmp/seaweedfs_ec_work" // Default work directory
if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
masterClient = mc
}
if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
workDir = wd
}
// Create EC task with comprehensive capabilities
task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir)
// Set gRPC dial option if provided
if params.GrpcDialOption != nil {
task.SetDialOption(params.GrpcDialOption)
}
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// getSharedInstances returns shared detector and scheduler instances
func getSharedInstances() (*EcDetector, *Scheduler) {
// Create shared instances (singleton pattern)
detector := NewEcDetector()
scheduler := NewScheduler()
return detector, scheduler
}
// GetSharedInstances returns the shared detector and scheduler instances (public API)
func GetSharedInstances() (*EcDetector, *Scheduler) {
return getSharedInstances()
}
// Auto-register this task when the package is imported
func init() {
factory := NewFactory()
tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
// Get shared instances for all registrations
detector, scheduler := getSharedInstances()
// Register with types registry
tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
registry.RegisterTask(detector, scheduler)
})
// Register with UI registry using the same instances
tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
RegisterUI(uiRegistry, detector, scheduler)
})
// Use new architecture instead of old registration
initErasureCodingV2()
}

View File

@ -1,95 +0,0 @@
package erasure_coding
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Scheduler implements erasure coding task scheduling
type Scheduler struct {
maxConcurrent int
enabled bool
}
// NewScheduler creates a new erasure coding scheduler
func NewScheduler() *Scheduler {
return &Scheduler{
maxConcurrent: 1, // Conservative default
enabled: false, // Conservative default
}
}
// GetTaskType returns the task type
func (s *Scheduler) GetTaskType() types.TaskType {
return types.TaskTypeErasureCoding
}
// CanScheduleNow determines if an erasure coding task can be scheduled now
func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
if !s.enabled {
return false
}
// Check if we have available workers
if len(availableWorkers) == 0 {
return false
}
// Count running EC tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeErasureCoding {
runningCount++
}
}
// Check concurrency limit
if runningCount >= s.maxConcurrent {
glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
return false
}
// Check if any worker can handle EC tasks
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeErasureCoding {
glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
return true
}
}
}
return false
}
// GetMaxConcurrent returns the maximum number of concurrent tasks
func (s *Scheduler) GetMaxConcurrent() int {
return s.maxConcurrent
}
// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
return 24 * time.Hour // Don't repeat EC for 24 hours
}
// GetPriority returns the priority for this task
func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
return types.TaskPriorityLow // EC is not urgent
}
// IsEnabled returns whether this task type is enabled
func (s *Scheduler) IsEnabled() bool {
return s.enabled
}
// Configuration setters
func (s *Scheduler) SetEnabled(enabled bool) {
s.enabled = enabled
}
func (s *Scheduler) SetMaxConcurrent(max int) {
s.maxConcurrent = max
}

View File

@ -0,0 +1,301 @@
package erasure_coding
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// ErasureCodingConfigV2 extends BaseConfig with erasure coding specific settings
type ErasureCodingConfigV2 struct {
base.BaseConfig
QuietForSeconds int `json:"quiet_for_seconds"`
FullnessRatio float64 `json:"fullness_ratio"`
CollectionFilter string `json:"collection_filter"`
}
// ToMap converts config to map (extend base functionality)
func (c *ErasureCodingConfigV2) ToMap() map[string]interface{} {
result := c.BaseConfig.ToMap()
result["quiet_for_seconds"] = c.QuietForSeconds
result["fullness_ratio"] = c.FullnessRatio
result["collection_filter"] = c.CollectionFilter
return result
}
// FromMap loads config from map (extend base functionality)
func (c *ErasureCodingConfigV2) FromMap(data map[string]interface{}) error {
// Load base config first
if err := c.BaseConfig.FromMap(data); err != nil {
return err
}
// Load erasure coding specific config
if quietFor, ok := data["quiet_for_seconds"].(int); ok {
c.QuietForSeconds = quietFor
}
if fullness, ok := data["fullness_ratio"].(float64); ok {
c.FullnessRatio = fullness
}
if filter, ok := data["collection_filter"].(string); ok {
c.CollectionFilter = filter
}
return nil
}
// ecDetection implements the detection logic for erasure coding tasks
func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
ecConfig := config.(*ErasureCodingConfigV2)
var results []*types.TaskDetectionResult
now := time.Now()
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
for _, metric := range metrics {
// Skip if already EC volume
if metric.IsECVolume {
continue
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
continue
}
// Check collection filter if specified
if ecConfig.CollectionFilter != "" {
// Parse comma-separated collections
allowedCollections := make(map[string]bool)
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
allowedCollections[strings.TrimSpace(collection)] = true
}
// Skip if volume's collection is not in the allowed list
if !allowedCollections[metric.Collection] {
continue
}
}
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
float64(metric.Size)/(1024*1024)),
Parameters: map[string]interface{}{
"age_seconds": int(metric.Age.Seconds()),
"fullness_ratio": metric.FullnessRatio,
"size_mb": int(metric.Size / (1024 * 1024)),
},
ScheduleAt: now,
}
results = append(results, result)
}
}
return results, nil
}
// ecScheduling implements the scheduling logic for erasure coding tasks
func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
ecConfig := config.(*ErasureCodingConfigV2)
// Check if we have available workers
if len(availableWorkers) == 0 {
return false
}
// Count running EC tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeErasureCoding {
runningCount++
}
}
// Check concurrency limit
if runningCount >= ecConfig.MaxConcurrent {
return false
}
// Check if any worker can handle EC tasks
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeErasureCoding {
return true
}
}
}
return false
}
// createErasureCodingTask creates an erasure coding task instance
func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
// Extract additional parameters for comprehensive EC
masterClient := "localhost:9333" // Default master client
workDir := "/tmp/seaweedfs_ec_work" // Default work directory
if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
masterClient = mc
}
if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
workDir = wd
}
// Create EC task with comprehensive capabilities
task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir)
// Set gRPC dial option if provided
if params.GrpcDialOption != nil {
task.SetDialOption(params.GrpcDialOption)
}
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks
func getErasureCodingConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Erasure Coding Tasks",
Description: "Whether erasure coding tasks should be automatically created",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60, // 7 days
MinValue: 1 * 24 * 60 * 60, // 1 day
MaxValue: 30 * 24 * 60 * 60, // 30 days
Required: true,
DisplayName: "Quiet For Duration",
Description: "Only apply erasure coding to volumes that have not been modified for this duration",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 12 * 60 * 60, // 12 hours
MinValue: 2 * 60 * 60, // 2 hours
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing erasure coding",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of erasure coding tasks that can run simultaneously",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.9, // 90%
MinValue: 0.5,
MaxValue: 1.0,
Required: true,
DisplayName: "Fullness Ratio",
Description: "Only apply erasure coding to volumes with fullness ratio above this threshold",
Placeholder: "0.90 (90%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)",
Placeholder: "collection1,collection2",
InputType: "text",
CSSClasses: "form-control",
},
},
}
}
// initErasureCodingV2 registers the refactored erasure coding task
func initErasureCodingV2() {
// Create configuration instance
config := &ErasureCodingConfigV2{
BaseConfig: base.BaseConfig{
Enabled: false, // Conservative default - enable via configuration
ScanIntervalSeconds: 12 * 60 * 60, // 12 hours
MaxConcurrent: 1, // Conservative default
},
QuietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period
FullnessRatio: 0.90, // 90% full threshold
CollectionFilter: "", // No collection filter by default
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeErasureCoding,
Name: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Converts volumes to erasure coded format for improved data durability",
Icon: "fas fa-shield-alt text-info",
Capabilities: []string{"erasure_coding", "storage", "durability"},
Config: config,
ConfigSpec: getErasureCodingConfigSpec(),
CreateTask: createErasureCodingTask,
DetectionFunc: ecDetection,
ScanInterval: 12 * time.Hour,
SchedulingFunc: ecScheduling,
MaxConcurrent: 1,
RepeatInterval: 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

View File

@ -1,18 +0,0 @@
package erasure_coding
import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// SchemaProvider implements the TaskConfigSchemaProvider interface for erasure coding tasks
type SchemaProvider struct{}
// GetConfigSchema returns the schema for erasure coding task configuration
func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
return GetConfigSchema()
}
// init registers the erasure coding schema provider
func init() {
tasks.RegisterTaskConfigSchema("erasure_coding", &SchemaProvider{})
}

View File

@ -1,107 +0,0 @@
package erasure_coding
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// ErasureCodingConfig represents the erasure coding configuration matching the schema
type ErasureCodingConfig struct {
Enabled bool `json:"enabled"`
QuietForSeconds int `json:"quiet_for_seconds"`
ScanIntervalSeconds int `json:"scan_interval_seconds"`
MaxConcurrent int `json:"max_concurrent"`
FullnessRatio float64 `json:"fullness_ratio"`
CollectionFilter string `json:"collection_filter"`
}
// ErasureCodingUILogic contains the business logic for erasure coding UI operations
type ErasureCodingUILogic struct {
detector *EcDetector
scheduler *Scheduler
}
// NewErasureCodingUILogic creates new erasure coding UI logic
func NewErasureCodingUILogic(detector *EcDetector, scheduler *Scheduler) *ErasureCodingUILogic {
return &ErasureCodingUILogic{
detector: detector,
scheduler: scheduler,
}
}
// GetCurrentConfig returns the current erasure coding configuration
func (logic *ErasureCodingUILogic) GetCurrentConfig() interface{} {
config := ErasureCodingConfig{
// Default values from schema (matching task_config_schema.go)
Enabled: true,
QuietForSeconds: 7 * 24 * 60 * 60, // 7 days
ScanIntervalSeconds: 12 * 60 * 60, // 12 hours
MaxConcurrent: 1,
FullnessRatio: 0.9, // 90%
CollectionFilter: "",
}
// Get current values from detector
if logic.detector != nil {
config.Enabled = logic.detector.IsEnabled()
config.QuietForSeconds = logic.detector.GetQuietForSeconds()
config.FullnessRatio = logic.detector.GetFullnessRatio()
config.CollectionFilter = logic.detector.GetCollectionFilter()
config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
}
// Get current values from scheduler
if logic.scheduler != nil {
config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
}
return config
}
// ApplyConfig applies the erasure coding configuration
func (logic *ErasureCodingUILogic) ApplyConfig(config interface{}) error {
ecConfig, ok := config.(ErasureCodingConfig)
if !ok {
return fmt.Errorf("invalid configuration type for erasure coding")
}
// Apply to detector
if logic.detector != nil {
logic.detector.SetEnabled(ecConfig.Enabled)
logic.detector.SetQuietForSeconds(ecConfig.QuietForSeconds)
logic.detector.SetFullnessRatio(ecConfig.FullnessRatio)
logic.detector.SetCollectionFilter(ecConfig.CollectionFilter)
logic.detector.SetScanInterval(time.Duration(ecConfig.ScanIntervalSeconds) * time.Second)
}
// Apply to scheduler
if logic.scheduler != nil {
logic.scheduler.SetEnabled(ecConfig.Enabled)
logic.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
}
glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, quiet_for=%v seconds, max_concurrent=%d, fullness_ratio=%f, collection_filter=%s",
ecConfig.Enabled, ecConfig.QuietForSeconds, ecConfig.MaxConcurrent, ecConfig.FullnessRatio, ecConfig.CollectionFilter)
return nil
}
// RegisterUI registers the erasure coding UI provider with the UI registry
func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
logic := NewErasureCodingUILogic(detector, scheduler)
tasks.CommonRegisterUI(
types.TaskTypeErasureCoding,
"Erasure Coding",
uiRegistry,
detector,
scheduler,
GetConfigSchema,
logic.GetCurrentConfig,
logic.ApplyConfig,
)
}

View File

@ -1,112 +0,0 @@
package vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// GetConfigSchema returns the schema for vacuum task configuration
func GetConfigSchema() *tasks.TaskConfigSchema {
return &tasks.TaskConfigSchema{
TaskName: "vacuum",
DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Schema: config.Schema{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3, // 30%
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60, // 2 hours
MinValue: 10 * 60, // 10 minutes
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
HelpText: "The system will check for volumes that need vacuuming at this interval",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
HelpText: "Limits the number of vacuum operations running at the same time to control system load",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60, // 24 hours
MinValue: 1 * 60 * 60, // 1 hour
MaxValue: 7 * 24 * 60 * 60, // 7 days
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60, // 7 days
MinValue: 1 * 24 * 60 * 60, // 1 day
MaxValue: 30 * 24 * 60 * 60, // 30 days
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
},
}
}

View File

@ -1,18 +0,0 @@
package vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
// SchemaProvider implements the TaskConfigSchemaProvider interface for vacuum tasks
type SchemaProvider struct{}
// GetConfigSchema returns the schema for vacuum task configuration
func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
return GetConfigSchema()
}
// init registers the vacuum schema provider
func init() {
tasks.RegisterTaskConfigSchema("vacuum", &SchemaProvider{})
}

View File

@ -1,112 +0,0 @@
package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumConfig represents the vacuum configuration matching the schema
type VacuumConfig struct {
Enabled bool `json:"enabled"`
GarbageThreshold float64 `json:"garbage_threshold"`
ScanIntervalSeconds int `json:"scan_interval_seconds"`
MaxConcurrent int `json:"max_concurrent"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// VacuumUILogic contains the business logic for vacuum UI operations
type VacuumUILogic struct {
detector *VacuumDetector
scheduler *VacuumScheduler
}
// NewVacuumUILogic creates new vacuum UI logic
func NewVacuumUILogic(detector *VacuumDetector, scheduler *VacuumScheduler) *VacuumUILogic {
return &VacuumUILogic{
detector: detector,
scheduler: scheduler,
}
}
// GetCurrentConfig returns the current vacuum configuration
func (logic *VacuumUILogic) GetCurrentConfig() interface{} {
config := &VacuumConfig{
// Default values from schema (matching task_config_schema.go)
Enabled: true,
GarbageThreshold: 0.3, // 30%
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
// Get current values from detector
if logic.detector != nil {
config.Enabled = logic.detector.IsEnabled()
config.GarbageThreshold = logic.detector.GetGarbageThreshold()
config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
config.MinVolumeAgeSeconds = int(logic.detector.GetMinVolumeAge().Seconds())
}
// Get current values from scheduler
if logic.scheduler != nil {
config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
config.MinIntervalSeconds = int(logic.scheduler.GetMinInterval().Seconds())
}
return config
}
// ApplyConfig applies the vacuum configuration
func (logic *VacuumUILogic) ApplyConfig(config interface{}) error {
vacuumConfig, ok := config.(*VacuumConfig)
if !ok {
return fmt.Errorf("invalid configuration type for vacuum")
}
// Apply to detector
if logic.detector != nil {
logic.detector.SetEnabled(vacuumConfig.Enabled)
logic.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold)
logic.detector.SetScanInterval(time.Duration(vacuumConfig.ScanIntervalSeconds) * time.Second)
logic.detector.SetMinVolumeAge(time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second)
}
// Apply to scheduler
if logic.scheduler != nil {
logic.scheduler.SetEnabled(vacuumConfig.Enabled)
logic.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent)
logic.scheduler.SetMinInterval(time.Duration(vacuumConfig.MinIntervalSeconds) * time.Second)
}
glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%ds, max_concurrent=%d",
vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, vacuumConfig.ScanIntervalSeconds, vacuumConfig.MaxConcurrent)
return nil
}
// RegisterUI registers the vacuum UI provider with the UI registry
func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) {
logic := NewVacuumUILogic(detector, scheduler)
tasks.CommonRegisterUI(
types.TaskTypeVacuum,
"Volume Vacuum",
uiRegistry,
detector,
scheduler,
GetConfigSchema,
logic.GetCurrentConfig,
logic.ApplyConfig,
)
}
// GetUIProvider returns the UI provider for external use
func GetUIProvider(uiRegistry *types.UIRegistry) types.TaskUIProvider {
return uiRegistry.GetProvider(types.TaskTypeVacuum)
}

View File

@ -1,132 +0,0 @@
package vacuum
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumDetector implements vacuum task detection using code instead of schemas
type VacuumDetector struct {
enabled bool
garbageThreshold float64
minVolumeAge time.Duration
scanInterval time.Duration
}
// Compile-time interface assertions
var (
_ types.TaskDetector = (*VacuumDetector)(nil)
_ types.PolicyConfigurableDetector = (*VacuumDetector)(nil)
)
// NewVacuumDetector creates a new simple vacuum detector
func NewVacuumDetector() *VacuumDetector {
return &VacuumDetector{
enabled: true,
garbageThreshold: 0.3,
minVolumeAge: 24 * time.Hour,
scanInterval: 30 * time.Minute,
}
}
// GetTaskType returns the task type
func (d *VacuumDetector) GetTaskType() types.TaskType {
return types.TaskTypeVacuum
}
// ScanForTasks scans for volumes that need vacuum operations
func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
if !d.enabled {
return nil, nil
}
var results []*types.TaskDetectionResult
for _, metric := range volumeMetrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge {
// Higher priority for volumes with more garbage
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
Parameters: map[string]interface{}{
"garbage_ratio": metric.GarbageRatio,
"volume_age": metric.Age.String(),
},
ScheduleAt: time.Now(),
}
results = append(results, result)
}
}
glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results))
return results, nil
}
// ScanInterval returns how often this detector should scan
func (d *VacuumDetector) ScanInterval() time.Duration {
return d.scanInterval
}
// IsEnabled returns whether this detector is enabled
func (d *VacuumDetector) IsEnabled() bool {
return d.enabled
}
// Configuration setters
func (d *VacuumDetector) SetEnabled(enabled bool) {
d.enabled = enabled
}
func (d *VacuumDetector) SetGarbageThreshold(threshold float64) {
d.garbageThreshold = threshold
}
func (d *VacuumDetector) SetScanInterval(interval time.Duration) {
d.scanInterval = interval
}
func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) {
d.minVolumeAge = age
}
// GetGarbageThreshold returns the current garbage threshold
func (d *VacuumDetector) GetGarbageThreshold() float64 {
return d.garbageThreshold
}
// GetMinVolumeAge returns the minimum volume age
func (d *VacuumDetector) GetMinVolumeAge() time.Duration {
return d.minVolumeAge
}
// GetScanInterval returns the scan interval
func (d *VacuumDetector) GetScanInterval() time.Duration {
return d.scanInterval
}
// ConfigureFromPolicy configures the detector based on the maintenance policy
func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) {
// Type assert to the maintenance policy type we expect
if maintenancePolicy, ok := policy.(interface {
GetVacuumEnabled() bool
GetVacuumGarbageRatio() float64
}); ok {
d.SetEnabled(maintenancePolicy.GetVacuumEnabled())
d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio())
} else {
glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type")
}
}

View File

@ -1,81 +1,7 @@
package vacuum
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Factory creates vacuum task instances
type Factory struct {
*tasks.BaseTaskFactory
}
// NewFactory creates a new vacuum task factory
func NewFactory() *Factory {
return &Factory{
BaseTaskFactory: tasks.NewBaseTaskFactory(
types.TaskTypeVacuum,
[]string{"vacuum", "storage"},
"Vacuum operation to reclaim disk space by removing deleted files",
),
}
}
// Create creates a new vacuum task instance
func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
task := NewTask(params.Server, params.VolumeID)
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// Shared detector and scheduler instances
var (
sharedDetector *VacuumDetector
sharedScheduler *VacuumScheduler
)
// getSharedInstances returns the shared detector and scheduler instances
func getSharedInstances() (*VacuumDetector, *VacuumScheduler) {
if sharedDetector == nil {
sharedDetector = NewVacuumDetector()
}
if sharedScheduler == nil {
sharedScheduler = NewVacuumScheduler()
}
return sharedDetector, sharedScheduler
}
// GetSharedInstances returns the shared detector and scheduler instances (public access)
func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) {
return getSharedInstances()
}
// Auto-register this task when the package is imported
func init() {
factory := NewFactory()
tasks.AutoRegister(types.TaskTypeVacuum, factory)
// Get shared instances for all registrations
detector, scheduler := getSharedInstances()
// Register with types registry
tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
registry.RegisterTask(detector, scheduler)
})
// Register with UI registry using the same instances
tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
RegisterUI(uiRegistry, detector, scheduler)
})
// Use new architecture instead of old registration
initVacuumV2()
}

View File

@ -1,111 +0,0 @@
package vacuum
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumScheduler implements vacuum task scheduling using code instead of schemas
type VacuumScheduler struct {
enabled bool
maxConcurrent int
minInterval time.Duration
}
// Compile-time interface assertions
var (
_ types.TaskScheduler = (*VacuumScheduler)(nil)
)
// NewVacuumScheduler creates a new simple vacuum scheduler
func NewVacuumScheduler() *VacuumScheduler {
return &VacuumScheduler{
enabled: true,
maxConcurrent: 2,
minInterval: 6 * time.Hour,
}
}
// GetTaskType returns the task type
func (s *VacuumScheduler) GetTaskType() types.TaskType {
return types.TaskTypeVacuum
}
// CanScheduleNow determines if a vacuum task can be scheduled right now
func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
// Check if scheduler is enabled
if !s.enabled {
return false
}
// Check concurrent limit
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
if runningVacuumCount >= s.maxConcurrent {
return false
}
// Check if there's an available worker with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// GetPriority returns the priority for this task
func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority {
// Could adjust priority based on task parameters
if params, ok := task.Parameters["garbage_ratio"].(float64); ok {
if params > 0.8 {
return types.TaskPriorityHigh
}
}
return task.Priority
}
// GetMaxConcurrent returns max concurrent tasks of this type
func (s *VacuumScheduler) GetMaxConcurrent() int {
return s.maxConcurrent
}
// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks
func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration {
return s.minInterval
}
// IsEnabled returns whether this scheduler is enabled
func (s *VacuumScheduler) IsEnabled() bool {
return s.enabled
}
// Configuration setters
func (s *VacuumScheduler) SetEnabled(enabled bool) {
s.enabled = enabled
}
func (s *VacuumScheduler) SetMaxConcurrent(max int) {
s.maxConcurrent = max
}
func (s *VacuumScheduler) SetMinInterval(interval time.Duration) {
s.minInterval = interval
}
// GetMinInterval returns the minimum interval
func (s *VacuumScheduler) GetMinInterval() time.Duration {
return s.minInterval
}

View File

@ -0,0 +1,269 @@
package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumConfigV2 extends BaseConfig with vacuum-specific settings
type VacuumConfigV2 struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// ToMap converts config to map (extend base functionality)
func (c *VacuumConfigV2) ToMap() map[string]interface{} {
result := c.BaseConfig.ToMap()
result["garbage_threshold"] = c.GarbageThreshold
result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds
result["min_interval_seconds"] = c.MinIntervalSeconds
return result
}
// FromMap loads config from map (extend base functionality)
func (c *VacuumConfigV2) FromMap(data map[string]interface{}) error {
// Load base config first
if err := c.BaseConfig.FromMap(data); err != nil {
return err
}
// Load vacuum-specific config
if threshold, ok := data["garbage_threshold"].(float64); ok {
c.GarbageThreshold = threshold
}
if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok {
c.MinVolumeAgeSeconds = ageSeconds
}
if intervalSeconds, ok := data["min_interval_seconds"].(int); ok {
c.MinIntervalSeconds = intervalSeconds
}
return nil
}
// vacuumDetection implements the detection logic for vacuum tasks
func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
vacuumConfig := config.(*VacuumConfigV2)
var results []*types.TaskDetectionResult
minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
for _, metric := range metrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
Parameters: map[string]interface{}{
"garbage_ratio": metric.GarbageRatio,
"volume_age": metric.Age.String(),
},
ScheduleAt: time.Now(),
}
results = append(results, result)
}
}
return results, nil
}
// vacuumScheduling implements the scheduling logic for vacuum tasks
func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
vacuumConfig := config.(*VacuumConfigV2)
// Count running vacuum tasks
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
// Check concurrency limit
if runningVacuumCount >= vacuumConfig.MaxConcurrent {
return false
}
// Check for available workers with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// createVacuumTask creates a vacuum task instance
func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
// Use existing vacuum task implementation
task := NewTask(params.Server, params.VolumeID)
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// getVacuumConfigSpec returns the configuration schema for vacuum tasks
func getVacuumConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
HelpText: "The system will check for volumes that need vacuuming at this interval",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
HelpText: "Limits the number of vacuum operations running at the same time to control system load",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 1 * 60 * 60,
MaxValue: 7 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}
// initVacuumV2 registers the refactored vacuum task (replaces the old registration)
func initVacuumV2() {
// Create configuration instance
config := &VacuumConfigV2{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeVacuum,
Name: "vacuum",
DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Capabilities: []string{"vacuum", "storage"},
Config: config,
ConfigSpec: getVacuumConfigSpec(),
CreateTask: createVacuumTask,
DetectionFunc: vacuumDetection,
ScanInterval: 2 * time.Hour,
SchedulingFunc: vacuumScheduling,
MaxConcurrent: 2,
RepeatInterval: 7 * 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

View File

@ -0,0 +1,260 @@
package vacuum_v2
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumConfig extends BaseConfig with vacuum-specific settings
type VacuumConfig struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// ToMap converts config to map (extend base functionality)
func (c *VacuumConfig) ToMap() map[string]interface{} {
result := c.BaseConfig.ToMap()
result["garbage_threshold"] = c.GarbageThreshold
result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds
result["min_interval_seconds"] = c.MinIntervalSeconds
return result
}
// FromMap loads config from map (extend base functionality)
func (c *VacuumConfig) FromMap(data map[string]interface{}) error {
// Load base config first
if err := c.BaseConfig.FromMap(data); err != nil {
return err
}
// Load vacuum-specific config
if threshold, ok := data["garbage_threshold"].(float64); ok {
c.GarbageThreshold = threshold
}
if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok {
c.MinVolumeAgeSeconds = ageSeconds
}
if intervalSeconds, ok := data["min_interval_seconds"].(int); ok {
c.MinIntervalSeconds = intervalSeconds
}
return nil
}
// vacuumDetection implements the detection logic for vacuum tasks
func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
vacuumConfig := config.(*VacuumConfig)
var results []*types.TaskDetectionResult
minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
for _, metric := range metrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
Parameters: map[string]interface{}{
"garbage_ratio": metric.GarbageRatio,
"volume_age": metric.Age.String(),
},
ScheduleAt: time.Now(),
}
results = append(results, result)
}
}
return results, nil
}
// vacuumScheduling implements the scheduling logic for vacuum tasks
func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
vacuumConfig := config.(*VacuumConfig)
// Count running vacuum tasks
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
// Check concurrency limit
if runningVacuumCount >= vacuumConfig.MaxConcurrent {
return false
}
// Check for available workers with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// createVacuumTask creates a vacuum task instance
func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
// Validate parameters
if params.VolumeID == 0 {
return nil, fmt.Errorf("volume_id is required")
}
if params.Server == "" {
return nil, fmt.Errorf("server is required")
}
// Reuse existing vacuum task implementation
task := vacuum.NewTask(params.Server, params.VolumeID)
task.SetEstimatedDuration(task.EstimateTime(params))
return task, nil
}
// getConfigSpec returns the configuration schema for vacuum tasks
func getConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 1 * 60 * 60,
MaxValue: 7 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}
// RegisterVacuumV2 registers the refactored vacuum task
func init() {
// Create configuration instance
config := &VacuumConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeVacuum,
Name: "vacuum_v2",
DisplayName: "Volume Vacuum (V2)",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Capabilities: []string{"vacuum", "storage"},
Config: config,
ConfigSpec: getConfigSpec(),
CreateTask: createVacuumTask,
DetectionFunc: vacuumDetection,
ScanInterval: 2 * time.Hour,
SchedulingFunc: vacuumScheduling,
MaxConcurrent: 2,
RepeatInterval: 7 * 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}