mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 16:53:14 +08:00
fix hanging task detail page
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||||
@@ -1198,47 +1199,75 @@ func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.Tas
|
|||||||
// Get execution logs from worker if task is active/completed and worker is connected
|
// Get execution logs from worker if task is active/completed and worker is connected
|
||||||
if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted {
|
if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted {
|
||||||
if as.workerGrpcServer != nil && task.WorkerID != "" {
|
if as.workerGrpcServer != nil && task.WorkerID != "" {
|
||||||
workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "")
|
// Add additional timeout protection for worker log requests
|
||||||
if err == nil && len(workerLogs) > 0 {
|
type logResult struct {
|
||||||
// Convert worker logs to maintenance logs
|
logs []*worker_pb.TaskLogEntry
|
||||||
for _, workerLog := range workerLogs {
|
err error
|
||||||
maintenanceLog := &maintenance.TaskExecutionLog{
|
}
|
||||||
Timestamp: time.Unix(workerLog.Timestamp, 0),
|
logChan := make(chan logResult, 1)
|
||||||
Level: workerLog.Level,
|
|
||||||
Message: workerLog.Message,
|
go func() {
|
||||||
Source: "worker",
|
workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "")
|
||||||
|
logChan <- logResult{logs: workerLogs, err: err}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for logs with timeout
|
||||||
|
select {
|
||||||
|
case result := <-logChan:
|
||||||
|
if result.err == nil && len(result.logs) > 0 {
|
||||||
|
workerLogs := result.logs
|
||||||
|
// Convert worker logs to maintenance logs
|
||||||
|
for _, workerLog := range workerLogs {
|
||||||
|
maintenanceLog := &maintenance.TaskExecutionLog{
|
||||||
|
Timestamp: time.Unix(workerLog.Timestamp, 0),
|
||||||
|
Level: workerLog.Level,
|
||||||
|
Message: workerLog.Message,
|
||||||
|
Source: "worker",
|
||||||
|
TaskID: taskID,
|
||||||
|
WorkerID: task.WorkerID,
|
||||||
|
}
|
||||||
|
// carry structured fields if present
|
||||||
|
if len(workerLog.Fields) > 0 {
|
||||||
|
maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields))
|
||||||
|
for k, v := range workerLog.Fields {
|
||||||
|
maintenanceLog.Fields[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// carry optional progress/status
|
||||||
|
if workerLog.Progress != 0 {
|
||||||
|
p := float64(workerLog.Progress)
|
||||||
|
maintenanceLog.Progress = &p
|
||||||
|
}
|
||||||
|
if workerLog.Status != "" {
|
||||||
|
maintenanceLog.Status = workerLog.Status
|
||||||
|
}
|
||||||
|
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog)
|
||||||
|
}
|
||||||
|
} else if result.err != nil {
|
||||||
|
// Add a diagnostic log entry when worker logs cannot be retrieved
|
||||||
|
diagnosticLog := &maintenance.TaskExecutionLog{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Level: "WARNING",
|
||||||
|
Message: fmt.Sprintf("Failed to retrieve worker logs: %v", result.err),
|
||||||
|
Source: "admin",
|
||||||
TaskID: taskID,
|
TaskID: taskID,
|
||||||
WorkerID: task.WorkerID,
|
WorkerID: task.WorkerID,
|
||||||
}
|
}
|
||||||
// carry structured fields if present
|
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
|
||||||
if len(workerLog.Fields) > 0 {
|
glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, result.err)
|
||||||
maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields))
|
|
||||||
for k, v := range workerLog.Fields {
|
|
||||||
maintenanceLog.Fields[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// carry optional progress/status
|
|
||||||
if workerLog.Progress != 0 {
|
|
||||||
p := float64(workerLog.Progress)
|
|
||||||
maintenanceLog.Progress = &p
|
|
||||||
}
|
|
||||||
if workerLog.Status != "" {
|
|
||||||
maintenanceLog.Status = workerLog.Status
|
|
||||||
}
|
|
||||||
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog)
|
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
case <-time.After(8 * time.Second):
|
||||||
// Add a diagnostic log entry when worker logs cannot be retrieved
|
// Timeout getting logs from worker
|
||||||
diagnosticLog := &maintenance.TaskExecutionLog{
|
timeoutLog := &maintenance.TaskExecutionLog{
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Level: "WARNING",
|
Level: "WARNING",
|
||||||
Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err),
|
Message: "Timeout retrieving worker logs - worker may be unresponsive or busy",
|
||||||
Source: "admin",
|
Source: "admin",
|
||||||
TaskID: taskID,
|
TaskID: taskID,
|
||||||
WorkerID: task.WorkerID,
|
WorkerID: task.WorkerID,
|
||||||
}
|
}
|
||||||
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
|
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, timeoutLog)
|
||||||
glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err)
|
glog.Warningf("Timeout getting worker logs for task %s from worker %s", taskID, task.WorkerID)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Add diagnostic information when worker is not available
|
// Add diagnostic information when worker is not available
|
||||||
|
|||||||
@@ -38,26 +38,53 @@ func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) {
|
|||||||
taskID := c.Param("id")
|
taskID := c.Param("id")
|
||||||
glog.Infof("DEBUG ShowTaskDetail: Starting for task ID: %s", taskID)
|
glog.Infof("DEBUG ShowTaskDetail: Starting for task ID: %s", taskID)
|
||||||
|
|
||||||
taskDetail, err := h.adminServer.GetMaintenanceTaskDetail(taskID)
|
// Add timeout to prevent indefinite hangs when worker is unresponsive
|
||||||
if err != nil {
|
ctx, cancel := context.WithTimeout(c.Request.Context(), 15*time.Second)
|
||||||
glog.Errorf("DEBUG ShowTaskDetail: error getting task detail for %s: %v", taskID, err)
|
defer cancel()
|
||||||
c.String(http.StatusNotFound, "Task not found: %s (Error: %v)", taskID, err)
|
|
||||||
|
// Use a channel to handle timeout for task detail retrieval
|
||||||
|
type result struct {
|
||||||
|
taskDetail *maintenance.TaskDetailData
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
resultChan := make(chan result, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
taskDetail, err := h.adminServer.GetMaintenanceTaskDetail(taskID)
|
||||||
|
resultChan <- result{taskDetail: taskDetail, err: err}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case res := <-resultChan:
|
||||||
|
if res.err != nil {
|
||||||
|
glog.Errorf("DEBUG ShowTaskDetail: error getting task detail for %s: %v", taskID, res.err)
|
||||||
|
c.String(http.StatusNotFound, "Task not found: %s (Error: %v)", taskID, res.err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("DEBUG ShowTaskDetail: got task detail for %s, task type: %s, status: %s", taskID, res.taskDetail.Task.Type, res.taskDetail.Task.Status)
|
||||||
|
|
||||||
|
c.Header("Content-Type", "text/html")
|
||||||
|
taskDetailComponent := app.TaskDetail(res.taskDetail)
|
||||||
|
layoutComponent := layout.Layout(c, taskDetailComponent)
|
||||||
|
err := layoutComponent.Render(ctx, c.Writer)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("DEBUG ShowTaskDetail: render error: %v", err)
|
||||||
|
c.String(http.StatusInternalServerError, "Failed to render template: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("DEBUG ShowTaskDetail: template rendered successfully for task %s", taskID)
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
glog.Warningf("ShowTaskDetail: timeout waiting for task detail data for task %s", taskID)
|
||||||
|
c.JSON(http.StatusRequestTimeout, gin.H{
|
||||||
|
"error": "Request timeout - task detail retrieval took too long. This may indicate the worker is unresponsive or stuck.",
|
||||||
|
"suggestion": "Try refreshing the page or check if the worker executing this task is responsive. If the task is stuck, it may need to be cancelled manually.",
|
||||||
|
"task_id": taskID,
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("DEBUG ShowTaskDetail: got task detail for %s, task type: %s, status: %s", taskID, taskDetail.Task.Type, taskDetail.Task.Status)
|
|
||||||
|
|
||||||
c.Header("Content-Type", "text/html")
|
|
||||||
taskDetailComponent := app.TaskDetail(taskDetail)
|
|
||||||
layoutComponent := layout.Layout(c, taskDetailComponent)
|
|
||||||
err = layoutComponent.Render(c.Request.Context(), c.Writer)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("DEBUG ShowTaskDetail: render error: %v", err)
|
|
||||||
c.String(http.StatusInternalServerError, "Failed to render template: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.Infof("DEBUG ShowTaskDetail: template rendered successfully for task %s", taskID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShowMaintenanceQueue displays the maintenance queue page
|
// ShowMaintenanceQueue displays the maintenance queue page
|
||||||
|
|||||||
Reference in New Issue
Block a user