Self-Contained Design

To prove the system is truly self-contained:
To add a new task:
Create a task package (e.g., worker/tasks/compression/)
Import it: _ "github.com/.../worker/tasks/compression"
That's it! No other changes needed.
To remove a task:
Delete the task package directory
Remove the import line
That's it! No other changes needed.
This commit is contained in:
chrislu
2025-08-10 00:15:26 -07:00
parent 96d6d27607
commit 05a0cc156b
14 changed files with 197 additions and 88 deletions

View File

@@ -455,6 +455,44 @@ func (cp *ConfigPersistence) IsConfigured() bool {
return cp.dataDir != ""
}
// SaveTaskPolicyGeneric saves a task policy for any task type dynamically
func (cp *ConfigPersistence) SaveTaskPolicyGeneric(taskType string, policy *worker_pb.TaskPolicy) error {
filename := fmt.Sprintf("task_%s.pb", taskType)
return cp.saveTaskConfig(filename, policy)
}
// LoadTaskPolicyGeneric loads a task policy for any task type dynamically
func (cp *ConfigPersistence) LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error) {
filename := fmt.Sprintf("task_%s.pb", taskType)
if cp.dataDir == "" {
return nil, fmt.Errorf("no data directory configured")
}
confDir := filepath.Join(cp.dataDir, ConfigSubdir)
configPath := filepath.Join(confDir, filename)
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return nil, fmt.Errorf("no configuration found for task type: %s", taskType)
}
// Read file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read task config file: %w", err)
}
// Unmarshal as TaskPolicy
var policy worker_pb.TaskPolicy
if err := proto.Unmarshal(configData, &policy); err != nil {
return nil, fmt.Errorf("failed to unmarshal task configuration: %w", err)
}
glog.V(1).Infof("Loaded task policy for %s from %s", taskType, configPath)
return &policy, nil
}
// GetConfigInfo returns information about the configuration storage
func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
info := map[string]interface{}{

View File

@@ -17,7 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/view/layout"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -230,27 +230,15 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
return
}
// Create a new config instance based on task type and apply schema defaults
var config TaskConfig
switch taskType {
case types.TaskTypeErasureCoding:
config = &erasure_coding.Config{}
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return
}
// Apply schema defaults first using type-safe method
if err := schema.ApplyDefaultsToConfig(config); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to apply defaults: " + err.Error()})
return
}
// First, get the current configuration to preserve existing values
// Get the config instance from the UI provider - this is a dynamic approach
// that doesn't require hardcoding task types
currentUIRegistry := tasks.GetGlobalUIRegistry()
currentTypesRegistry := tasks.GetGlobalTypesRegistry()
var config types.TaskConfig
var currentProvider types.TaskUIProvider
// Find the UI provider for this task type
for workerTaskType := range currentTypesRegistry.GetAllDetectors() {
if string(workerTaskType) == string(taskType) {
currentProvider = currentUIRegistry.GetProvider(workerTaskType)
@@ -258,16 +246,26 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
}
}
if currentProvider != nil {
// Copy current config values to the new config
currentConfig := currentProvider.GetCurrentConfig()
if currentConfigProtobuf, ok := currentConfig.(TaskConfig); ok {
// Apply current values using protobuf directly - no map conversion needed!
currentPolicy := currentConfigProtobuf.ToTaskPolicy()
if err := config.FromTaskPolicy(currentPolicy); err != nil {
glog.Warningf("Failed to load current config for %s: %v", taskTypeName, err)
}
}
if currentProvider == nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return
}
// Get a config instance from the UI provider
config = currentProvider.GetCurrentConfig()
if config == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get config for task type: " + taskTypeName})
return
}
// Apply schema defaults - config instances should already have defaults applied during creation
glog.V(2).Infof("Using config defaults for task type: %s", taskTypeName)
// Copy current config values (currentProvider is already set above)
// Apply current values using protobuf directly - no map conversion needed!
currentPolicy := config.ToTaskPolicy()
if err := config.FromTaskPolicy(currentPolicy); err != nil {
glog.Warningf("Failed to load current config for %s: %v", taskTypeName, err)
}
// Parse form data using schema-based approach (this will override with new values)
@@ -277,14 +275,8 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
return
}
// Debug logging - show parsed config values
switch taskType {
case types.TaskTypeErasureCoding:
if ecConfig, ok := config.(*erasure_coding.Config); ok {
glog.V(1).Infof("Parsed EC config - FullnessRatio: %f, QuietForSeconds: %d, MinSizeMB: %d, CollectionFilter: '%s'",
ecConfig.FullnessRatio, ecConfig.QuietForSeconds, ecConfig.MinSizeMB, ecConfig.CollectionFilter)
}
}
// Debug logging - config parsed for task type
glog.V(1).Infof("Parsed configuration for task type: %s", taskTypeName)
// Validate the configuration
if validationErrors := schema.ValidateConfig(config); len(validationErrors) > 0 {
@@ -553,7 +545,7 @@ func (h *MaintenanceHandlers) updateMaintenanceConfig(config *maintenance.Mainte
}
// saveTaskConfigToProtobuf saves task configuration to protobuf file
func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, config TaskConfig) error {
func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, config types.TaskConfig) error {
configPersistence := h.adminServer.GetConfigPersistence()
if configPersistence == nil {
return fmt.Errorf("config persistence not available")
@@ -562,11 +554,6 @@ func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType,
// Use the new ToTaskPolicy method - much simpler and more maintainable!
taskPolicy := config.ToTaskPolicy()
// Save using task-specific methods
switch taskType {
case types.TaskTypeErasureCoding:
return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy)
default:
return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType)
}
// Save using generic method - no more hardcoded task types!
return configPersistence.SaveTaskPolicyGeneric(string(taskType), taskPolicy)
}

View File

@@ -84,11 +84,10 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Examples of conflicting task types
// Define conflicting task types dynamically
// For now, assume no task types conflict (can be made configurable later)
conflictMap := map[TaskType][]TaskType{
TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
// No conflicts defined currently - this can be made configurable per task
}
if conflicts, exists := conflictMap[existing]; exists {

View File

@@ -7,30 +7,21 @@ import (
// CalculateTaskStorageImpact calculates storage impact for different task types
func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChange, targetChange StorageSlotChange) {
switch taskType {
case TaskTypeErasureCoding:
switch string(taskType) {
case "erasure_coding":
// EC task: distributes shards to MULTIPLE targets, source reserves with zero impact
// Source reserves capacity but with zero StorageSlotChange (no actual capacity consumption during planning)
// WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-target handling
// WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-destination calculation
// This simplified function returns zero impact; real EC requires specialized multi-destination calculation
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
case TaskTypeBalance:
// Balance task: moves volume from source to target
// Source loses 1 volume, target gains 1 volume
return StorageSlotChange{VolumeSlots: -1, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
case TaskTypeVacuum:
// Vacuum task: frees space by removing deleted entries, no slot change
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
case TaskTypeReplication:
case "replication":
// Replication task: creates new replica on target
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
default:
// Unknown task type, assume minimal impact
glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType)
glog.V(2).Infof("Task type %s not specifically handled in CalculateTaskStorageImpact, using default impact", taskType)
return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
}
}

View File

@@ -203,16 +203,16 @@ func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
switch taskType {
case TaskTypeErasureCoding:
switch string(taskType) {
case "erasure_coding":
switch cleanupType {
case CleanupVolumeReplica:
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
return impact
case CleanupECShards:
return CalculateECShardCleanupImpact(volumeSize)
default:
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
return impact
}
default:

View File

@@ -3,19 +3,12 @@ package topology
import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
// TaskType represents different types of maintenance operations
// Task types are now dynamically registered - use the worker/types package for task type operations
type TaskType string
// TaskStatus represents the current status of a task
type TaskStatus string
// Common task type constants
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeBalance TaskType = "balance"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeReplication TaskType = "replication"
)
// Common task status constants
const (
TaskStatusPending TaskStatus = "pending"

View File

@@ -25,6 +25,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
var (
@@ -229,6 +230,9 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
fmt.Printf("Data directory created/verified: %s\n", dataDir)
}
// Initialize dynamic task type functions now that all tasks are registered
tasks.InitializeDynamicTaskTypes()
// Create admin server
adminServer := dash.NewAdminServer(*options.masters, nil, dataDir)

View File

@@ -107,6 +107,9 @@ func runWorker(cmd *Command, args []string) bool {
// Create gRPC dial option using TLS configuration
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
// Initialize dynamic task type functions now that all tasks are registered
tasks.InitializeDynamicTaskTypes()
// Create worker configuration
config := &types.WorkerConfig{
AdminServer: *workerAdminServer,

View File

@@ -68,7 +68,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
result := &types.TaskDetectionResult{
TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeErasureCoding,
TaskType: types.TaskType("erasure_coding"),
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
@@ -168,7 +168,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeErasureCoding,
TaskType: topology.TaskType("erasure_coding"),
VolumeID: metric.VolumeID,
VolumeSize: int64(metric.Size),
Sources: sources,
@@ -279,7 +279,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
// For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
// Note: We don't exclude the source server since the original volume will be deleted after EC conversion
availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1)
availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskType("erasure_coding"), "", 1)
if len(availableDisks) < erasure_coding.MinTotalDisks {
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
}
@@ -322,7 +322,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
// Log storage impact for EC task (source only - EC has multiple targets handled individually)
sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskType("erasure_coding"), int64(metric.Size))
glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")

View File

@@ -42,7 +42,7 @@ type ErasureCodingTask struct {
// NewErasureCodingTask creates a new unified EC task instance
func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask {
return &ErasureCodingTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
BaseTask: base.NewBaseTask(id, types.TaskType("erasure_coding")),
server: server,
volumeID: volumeID,
collection: collection,

View File

@@ -19,7 +19,7 @@ func init() {
RegisterErasureCodingTask()
// Register config updater
tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence)
tasks.AutoRegisterConfigUpdater(types.TaskType("erasure_coding"), UpdateConfigFromPersistence)
}
// RegisterErasureCodingTask registers the erasure coding task with the new architecture
@@ -29,7 +29,7 @@ func RegisterErasureCodingTask() {
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeErasureCoding,
Type: types.TaskType("erasure_coding"),
Name: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Applies erasure coding to volumes for data protection",

View File

@@ -17,7 +17,7 @@ func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availabl
// Count running EC tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeErasureCoding {
if runningTask.Type == types.TaskType("erasure_coding") {
runningCount++
}
}
@@ -30,7 +30,7 @@ func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availabl
// Check if any worker can handle EC tasks
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeErasureCoding {
if capability == types.TaskType("erasure_coding") {
return true
}
}

View File

@@ -146,3 +146,62 @@ func (r *TaskRegistry) GetAll() map[types.TaskType]types.TaskFactory {
}
return result
}
// InitializeDynamicTaskTypes sets up the dynamic task type functions
// This should be called after all tasks have been registered
func InitializeDynamicTaskTypes() {
// Set up the function variables in the types package
types.GetAvailableTaskTypes = func() []types.TaskType {
typesRegistry := GetGlobalTypesRegistry()
var taskTypes []types.TaskType
for taskType := range typesRegistry.GetAllDetectors() {
taskTypes = append(taskTypes, taskType)
}
return taskTypes
}
types.IsTaskTypeAvailable = func(taskType types.TaskType) bool {
typesRegistry := GetGlobalTypesRegistry()
detectors := typesRegistry.GetAllDetectors()
_, exists := detectors[taskType]
return exists
}
types.GetTaskType = func(name string) (types.TaskType, bool) {
taskType := types.TaskType(name)
if types.IsTaskTypeAvailable(taskType) {
return taskType, true
}
return "", false
}
glog.V(1).Infof("Initialized dynamic task type functions")
}
// GetAllRegisteredTaskTypes returns all currently registered task types
func GetAllRegisteredTaskTypes() []types.TaskType {
if types.GetAvailableTaskTypes != nil {
return types.GetAvailableTaskTypes()
}
// Fallback: get directly from registry
typesRegistry := GetGlobalTypesRegistry()
var taskTypes []types.TaskType
for taskType := range typesRegistry.GetAllDetectors() {
taskTypes = append(taskTypes, taskType)
}
return taskTypes
}
// IsTaskTypeRegistered checks if a task type is currently registered
func IsTaskTypeRegistered(taskType types.TaskType) bool {
if types.IsTaskTypeAvailable != nil {
return types.IsTaskTypeAvailable(taskType)
}
// Fallback: check directly in registry
typesRegistry := GetGlobalTypesRegistry()
detectors := typesRegistry.GetAllDetectors()
_, exists := detectors[taskType]
return exists
}

View File

@@ -8,13 +8,10 @@ import (
)
// TaskType represents the type of maintenance task
// Task types are now dynamically registered by individual task packages
// No hardcoded constants - use registry functions to discover available tasks
type TaskType string
const (
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeReplication TaskType = "replication"
)
// TaskStatus represents the status of a maintenance task
type TaskStatus string
@@ -93,3 +90,41 @@ type ClusterReplicationTask struct {
CreatedAt time.Time `json:"created_at"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// TaskTypeRegistry provides dynamic access to registered task types
// This avoids hardcoded constants and allows tasks to be self-contained
type TaskTypeRegistry interface {
GetAllTaskTypes() []TaskType
IsTaskTypeRegistered(taskType TaskType) bool
GetTaskTypeByName(name string) (TaskType, bool)
}
// GetAvailableTaskTypes returns all dynamically registered task types
// This function will be implemented by importing a registry package that
// collects task types from all registered task packages
var GetAvailableTaskTypes func() []TaskType
// IsTaskTypeAvailable checks if a task type is registered and available
var IsTaskTypeAvailable func(TaskType) bool
// GetTaskType converts a string to TaskType if it's registered
var GetTaskType func(string) (TaskType, bool)
// Common task type accessor functions that will be set by the registry
// These allow other packages to get task types without hardcoded constants
// GetErasureCodingTaskType returns the erasure coding task type if registered
func GetErasureCodingTaskType() (TaskType, bool) {
if GetTaskType != nil {
return GetTaskType("erasure_coding")
}
return "", false
}
// GetReplicationTaskType returns the replication task type if registered
func GetReplicationTaskType() (TaskType, bool) {
if GetTaskType != nil {
return GetTaskType("replication")
}
return "", false
}