add generic logging and implement it for balancing task

This commit is contained in:
chrislu
2025-07-27 11:04:51 -07:00
parent 9025b72241
commit 1b64a0f034
4 changed files with 135 additions and 1 deletions

View File

@@ -30,6 +30,7 @@ func NewTask(server string, volumeID uint32, collection string) *Task {
// Execute executes the balance task
func (t *Task) Execute(params types.TaskParams) error {
t.LogInfo("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
// Simulate balance operation with progress updates
@@ -45,18 +46,23 @@ func (t *Task) Execute(params types.TaskParams) error {
{"Verifying balance", 1 * time.Second, 100},
}
for _, step := range steps {
for i, step := range steps {
if t.IsCancelled() {
t.LogWarning("Balance task cancelled at step %d: %s", i+1, step.name)
return fmt.Errorf("balance task cancelled")
}
t.LogInfo("Starting step %d/%d: %s", i+1, len(steps), step.name)
glog.V(1).Infof("Balance task step: %s", step.name)
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
t.LogDebug("Completed step %d/%d: %s (progress: %.1f%%)", i+1, len(steps), step.name, step.progress)
}
t.LogInfo("Balance task completed successfully for volume %d on server %s", t.volumeID, t.server)
glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server)
return nil
}

View File

@@ -2,6 +2,10 @@ package tasks
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
@@ -16,6 +20,11 @@ type BaseTask struct {
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
// Logging functionality
logFile *os.File
logger *log.Logger
logFilePath string
}
// NewBaseTask creates a new base task
@@ -95,6 +104,16 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
// InitializeTaskLogging sets up task-specific logging - can be called by worker or tasks
func (t *BaseTask) InitializeTaskLogging(workingDir, taskID string) error {
return t.initializeLogging(workingDir, taskID)
}
// CloseTaskLogging properly closes task logging - can be called by worker or tasks
func (t *BaseTask) CloseTaskLogging() {
t.closeLogging()
}
// ExecuteTask is a wrapper that handles common task execution logic
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
t.SetStartTime(time.Now())
@@ -132,6 +151,92 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
return nil
}
// initializeLogging sets up task-specific logging to a file in the working directory
func (t *BaseTask) initializeLogging(workingDir, taskID string) error {
if workingDir == "" {
// If no working directory specified, skip file logging
return nil
}
// Ensure working directory exists
if err := os.MkdirAll(workingDir, 0755); err != nil {
return fmt.Errorf("failed to create working directory %s: %v", workingDir, err)
}
// Create task-specific log file
timestamp := time.Now().Format("20060102_150405")
logFileName := fmt.Sprintf("%s_%s_%s.log", t.taskType, taskID, timestamp)
t.logFilePath = filepath.Join(workingDir, logFileName)
logFile, err := os.OpenFile(t.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to create log file %s: %v", t.logFilePath, err)
}
t.logFile = logFile
t.logger = log.New(logFile, "", log.LstdFlags|log.Lmicroseconds)
// Log task initialization
t.LogInfo("Task %s initialized for %s", taskID, t.taskType)
return nil
}
// closeLogging properly closes the log file
func (t *BaseTask) closeLogging() {
if t.logFile != nil {
t.LogInfo("Task completed, closing log file")
t.logFile.Close()
t.logFile = nil
t.logger = nil
}
}
// LogInfo writes an info-level log message to both glog and task log file
func (t *BaseTask) LogInfo(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
// Always log to task file if available
if t.logger != nil {
t.logger.Printf("[INFO] %s", message)
}
}
// LogError writes an error-level log message to both glog and task log file
func (t *BaseTask) LogError(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
// Always log to task file if available
if t.logger != nil {
t.logger.Printf("[ERROR] %s", message)
}
}
// LogDebug writes a debug-level log message to task log file
func (t *BaseTask) LogDebug(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
// Always log to task file if available
if t.logger != nil {
t.logger.Printf("[DEBUG] %s", message)
}
}
// LogWarning writes a warning-level log message to both glog and task log file
func (t *BaseTask) LogWarning(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
// Always log to task file if available
if t.logger != nil {
t.logger.Printf("[WARNING] %s", message)
}
}
// GetLogFilePath returns the path to the task's log file
func (t *BaseTask) GetLogFilePath() string {
return t.logFilePath
}
// TaskRegistry manages task factories
type TaskRegistry struct {
factories map[types.TaskType]types.TaskFactory

View File

@@ -108,4 +108,13 @@ type TaskInterface interface {
EstimateTime(params TaskParams) time.Duration
GetProgress() float64
Cancel() error
// Logging methods for task-specific log files
InitializeTaskLogging(workingDir, taskID string) error
CloseTaskLogging()
LogInfo(format string, args ...interface{})
LogError(format string, args ...interface{})
LogDebug(format string, args ...interface{})
LogWarning(format string, args ...interface{})
GetLogFilePath() string
}

View File

@@ -366,15 +366,29 @@ func (w *Worker) executeTask(task *types.Task) {
return
}
// Initialize task logging
if err := taskInstance.InitializeTaskLogging(taskWorkingDir, task.ID); err != nil {
glog.Warningf("Failed to initialize task logging for %s: %v", task.ID, err)
} else {
// Ensure logging is closed when task completes
defer taskInstance.CloseTaskLogging()
taskInstance.LogInfo("Worker %s starting execution of task %s (type: %s)", w.id, task.ID, task.Type)
if task.VolumeID != 0 {
taskInstance.LogInfo("Task parameters: VolumeID=%d, Server=%s, Collection=%s", task.VolumeID, task.Server, task.Collection)
}
}
// Execute task
err = taskInstance.Execute(taskParams)
// Report completion
if err != nil {
taskInstance.LogError("Task execution failed: %v", err)
w.completeTask(task.ID, false, err.Error())
w.tasksFailed++
glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
} else {
taskInstance.LogInfo("Task completed successfully")
w.completeTask(task.ID, true, "")
w.tasksCompleted++
glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)