adding ec vacuum

This commit is contained in:
chrislu
2025-08-10 00:58:26 -07:00
parent 05a0cc156b
commit 32e7ba2f9d
6 changed files with 898 additions and 3 deletions

View File

@@ -0,0 +1,198 @@
package ec_vacuum
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with EC vacuum specific settings
type Config struct {
base.BaseConfig
DeletionThreshold float64 `json:"deletion_threshold"` // Minimum deletion ratio to trigger vacuum
MinVolumeAgeHours int `json:"min_volume_age_hours"` // Minimum age before considering vacuum
CollectionFilter string `json:"collection_filter"` // Filter by collection
MinSizeMB int `json:"min_size_mb"` // Minimum original volume size
}
// NewDefaultConfig creates a new default EC vacuum configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 24 * 60 * 60, // 24 hours
MaxConcurrent: 1,
},
DeletionThreshold: 0.3, // 30% deletions trigger vacuum
MinVolumeAgeHours: 72, // 3 days minimum age
CollectionFilter: "", // No filter by default
MinSizeMB: 100, // 100MB minimum size
}
}
// GetConfigSpec returns the configuration schema for EC vacuum tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable EC Vacuum Tasks",
Description: "Whether EC vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic EC vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 6 * 60 * 60, // 6 hours minimum
MaxValue: 7 * 24 * 60 * 60, // 7 days maximum
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for EC volumes needing vacuum",
HelpText: "The system will check for EC volumes with deletions at this interval",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of EC vacuum tasks that can run simultaneously",
HelpText: "Limits the number of EC vacuum operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "deletion_threshold",
JSONName: "deletion_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.1,
MaxValue: 0.8,
Required: true,
DisplayName: "Deletion Threshold",
Description: "Minimum ratio of deletions to trigger vacuum",
HelpText: "EC volumes with this ratio of deleted content will be vacuumed",
Placeholder: "0.3 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_hours",
JSONName: "min_volume_age_hours",
Type: config.FieldTypeInterval,
DefaultValue: 72,
MinValue: 24,
MaxValue: 30 * 24, // 30 days
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Minimum age before considering EC volume for vacuum",
HelpText: "Only EC volumes older than this will be considered for vacuum",
Placeholder: "72",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only vacuum EC volumes in this collection (empty = all collections)",
HelpText: "Leave empty to vacuum EC volumes in all collections",
Placeholder: "e.g., 'logs' or leave empty",
Unit: config.UnitNone,
InputType: "text",
CSSClasses: "form-control",
},
{
Name: "min_size_mb",
JSONName: "min_size_mb",
Type: config.FieldTypeInt,
DefaultValue: 100,
MinValue: 10,
MaxValue: 10000,
Required: true,
DisplayName: "Minimum Size (MB)",
Description: "Minimum original EC volume size to consider for vacuum",
HelpText: "Only EC volumes larger than this size will be considered for vacuum",
Placeholder: "100",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
// Note: EC vacuum specific config would go in TaskConfig field
// For now using basic policy until protobuf definitions are added
}
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
// Set general TaskPolicy fields
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
// Note: EC vacuum-specific fields would be loaded from TaskConfig field
// For now using defaults until protobuf definitions are added
// Keep existing values if not specified in policy
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available using generic method
if persistence, ok := configPersistence.(interface {
LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadTaskPolicyGeneric("ec_vacuum"); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
glog.V(1).Infof("Loaded EC vacuum configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default EC vacuum configuration")
return config
}

View File

@@ -0,0 +1,208 @@
package ec_vacuum
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
wtypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection identifies EC volumes that need vacuum operations
func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, config base.TaskConfig) ([]*wtypes.TaskDetectionResult, error) {
ecVacuumConfig, ok := config.(*Config)
if !ok {
return nil, fmt.Errorf("invalid config type for EC vacuum detection")
}
if !ecVacuumConfig.Enabled {
return nil, nil
}
glog.V(2).Infof("EC vacuum detection: checking %d volume metrics", len(metrics))
var results []*wtypes.TaskDetectionResult
now := time.Now()
// Get topology info for EC shard analysis
if info.ActiveTopology == nil {
glog.V(1).Infof("EC vacuum detection: no topology info available")
return results, nil
}
// Collect EC volume information from topology
ecVolumeInfo := collectEcVolumeInfo(info.ActiveTopology)
glog.V(2).Infof("EC vacuum detection: found %d EC volumes in topology", len(ecVolumeInfo))
for volumeID, ecInfo := range ecVolumeInfo {
// Apply filters
if !shouldVacuumEcVolume(ecInfo, ecVacuumConfig, now) {
continue
}
// Calculate deletion ratio
deletionRatio := calculateDeletionRatio(ecInfo)
if deletionRatio < ecVacuumConfig.DeletionThreshold {
glog.V(3).Infof("EC volume %d deletion ratio %.3f below threshold %.3f",
volumeID, deletionRatio, ecVacuumConfig.DeletionThreshold)
continue
}
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("ec_vacuum_vol_%d_%d", volumeID, now.Unix())
result := &wtypes.TaskDetectionResult{
TaskID: taskID,
TaskType: wtypes.TaskType("ec_vacuum"),
VolumeID: volumeID,
Server: ecInfo.PrimaryNode,
Collection: ecInfo.Collection,
Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent
Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%dh), size=%.1fMB (>%dMB)",
deletionRatio*100, ecVacuumConfig.DeletionThreshold*100,
ecInfo.Age.Hours(), ecVacuumConfig.MinVolumeAgeHours,
float64(ecInfo.Size)/(1024*1024), ecVacuumConfig.MinSizeMB),
ScheduleAt: now,
}
// Add to topology's pending tasks for capacity management (simplified for now)
if info.ActiveTopology != nil {
glog.V(3).Infof("EC vacuum detection: would add pending task %s to topology for volume %d", taskID, volumeID)
// Note: Simplified for now - in production would properly integrate with ActiveTopology
}
results = append(results, result)
glog.V(1).Infof("EC vacuum detection: queued volume %d for vacuum (deletion_ratio=%.1f%%, size=%.1fMB)",
volumeID, deletionRatio*100, float64(ecInfo.Size)/(1024*1024))
}
glog.V(1).Infof("EC vacuum detection: found %d EC volumes needing vacuum", len(results))
return results, nil
}
// EcVolumeInfo contains information about an EC volume
type EcVolumeInfo struct {
VolumeID uint32
Collection string
Size uint64
CreatedAt time.Time
Age time.Duration
PrimaryNode string
ShardNodes map[pb.ServerAddress]erasure_coding.ShardBits
DeletionInfo DeletionInfo
}
// DeletionInfo contains deletion statistics for an EC volume
type DeletionInfo struct {
TotalEntries int64
DeletedEntries int64
DeletionRatio float64
}
// collectEcVolumeInfo extracts EC volume information from active topology
func collectEcVolumeInfo(activeTopology interface{}) map[uint32]*EcVolumeInfo {
ecVolumes := make(map[uint32]*EcVolumeInfo)
// Simplified implementation for demonstration
// In production, this would query the topology for actual EC volume information
// For now, return empty map since we don't have direct access to topology data
glog.V(3).Infof("EC vacuum detection: topology analysis not implemented, returning empty volume list")
return ecVolumes
}
// shouldVacuumEcVolume determines if an EC volume should be considered for vacuum
func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) bool {
// Check minimum age
if ecInfo.Age < time.Duration(config.MinVolumeAgeHours)*time.Hour {
glog.V(3).Infof("EC volume %d too young: age=%.1fh < %dh",
ecInfo.VolumeID, ecInfo.Age.Hours(), config.MinVolumeAgeHours)
return false
}
// Check minimum size
sizeMB := float64(ecInfo.Size) / (1024 * 1024)
if sizeMB < float64(config.MinSizeMB) {
glog.V(3).Infof("EC volume %d too small: size=%.1fMB < %dMB",
ecInfo.VolumeID, sizeMB, config.MinSizeMB)
return false
}
// Check collection filter
if config.CollectionFilter != "" && !strings.Contains(ecInfo.Collection, config.CollectionFilter) {
glog.V(3).Infof("EC volume %d collection %s doesn't match filter %s",
ecInfo.VolumeID, ecInfo.Collection, config.CollectionFilter)
return false
}
// Check if we have enough shards for vacuum operation
totalShards := 0
for _, shardBits := range ecInfo.ShardNodes {
totalShards += shardBits.ShardIdCount()
}
if totalShards < erasure_coding.DataShardsCount {
glog.V(3).Infof("EC volume %d insufficient shards for vacuum: have=%d, need=%d",
ecInfo.VolumeID, totalShards, erasure_coding.DataShardsCount)
return false
}
return true
}
// calculateDeletionRatio calculates the deletion ratio for an EC volume
func calculateDeletionRatio(ecInfo *EcVolumeInfo) float64 {
if ecInfo.DeletionInfo.TotalEntries == 0 {
// If no deletion info available, estimate based on shard distribution
// Volumes with uneven shard distribution might indicate deletion
return estimateDeletionFromShardDistribution(ecInfo)
}
return ecInfo.DeletionInfo.DeletionRatio
}
// estimateDeletionInfo provides a simplified estimation of deletion info
func estimateDeletionInfo(volumeSize uint64) DeletionInfo {
// Simplified estimation - in reality would parse ecj files
// For demonstration, assume some deletion exists if the volume is old enough
estimatedTotal := int64(volumeSize / 1024) // Rough estimate of entries
estimatedDeleted := estimatedTotal / 10 // Assume 10% deletions as baseline
deletionRatio := 0.0
if estimatedTotal > 0 {
deletionRatio = float64(estimatedDeleted) / float64(estimatedTotal)
}
return DeletionInfo{
TotalEntries: estimatedTotal,
DeletedEntries: estimatedDeleted,
DeletionRatio: deletionRatio,
}
}
// estimateDeletionFromShardDistribution estimates deletion ratio from shard distribution patterns
func estimateDeletionFromShardDistribution(ecInfo *EcVolumeInfo) float64 {
// Simplified heuristic: if shards are not evenly distributed,
// it might indicate the volume has been through some operations
// In a real implementation, would analyze ecj files directly
nodeCount := len(ecInfo.ShardNodes)
if nodeCount == 0 {
return 0.0
}
// If all shards are on one node, it might indicate consolidation due to deletions
for _, shardBits := range ecInfo.ShardNodes {
if shardBits.ShardIdCount() >= erasure_coding.TotalShardsCount {
return 0.4 // Higher deletion ratio for consolidated volumes
}
}
// Default conservative estimate
return 0.1
}

View File

@@ -0,0 +1,343 @@
package ec_vacuum
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
"google.golang.org/grpc"
)
// EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes
type EcVacuumTask struct {
*base.BaseTask
volumeID uint32
collection string
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
tempDir string
grpcDialOption grpc.DialOption
}
// NewEcVacuumTask creates a new EC vacuum task instance
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits) *EcVacuumTask {
return &EcVacuumTask{
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
volumeID: volumeID,
collection: collection,
sourceNodes: sourceNodes,
}
}
// Execute performs the EC vacuum operation
func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
t.LogInfo("Starting EC vacuum task", map[string]interface{}{
"volume_id": t.volumeID,
"collection": t.collection,
"shard_nodes": len(t.sourceNodes),
})
// Step 1: Create temporary working directory
if err := t.createTempDir(); err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
defer t.cleanup()
// Step 2: Collect EC shards to this worker
targetNode, err := t.collectEcShardsToWorker()
if err != nil {
return fmt.Errorf("failed to collect EC shards: %w", err)
}
// Step 3: Decode EC shards into normal volume (skips deleted entries automatically)
if err := t.decodeEcShardsToVolume(targetNode); err != nil {
return fmt.Errorf("failed to decode EC shards to volume: %w", err)
}
// Step 4: Re-encode the cleaned volume into new EC shards
if err := t.encodeVolumeToEcShards(targetNode); err != nil {
return fmt.Errorf("failed to encode volume to EC shards: %w", err)
}
// Step 5: Distribute new EC shards to cluster
if err := t.distributeNewEcShards(targetNode); err != nil {
return fmt.Errorf("failed to distribute new EC shards: %w", err)
}
// Step 6: Clean up old EC shards
if err := t.cleanupOldEcShards(); err != nil {
t.LogWarning("Failed to clean up old EC shards", map[string]interface{}{
"error": err.Error(),
})
// Don't fail the task for cleanup errors
}
t.LogInfo("EC vacuum task completed successfully", map[string]interface{}{
"volume_id": t.volumeID,
"collection": t.collection,
})
return nil
}
// createTempDir creates a temporary directory for the vacuum operation
func (t *EcVacuumTask) createTempDir() error {
tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("ec_vacuum_%d_%d", t.volumeID, time.Now().Unix()))
if err := os.MkdirAll(tempDir, 0755); err != nil {
return err
}
t.tempDir = tempDir
t.LogInfo("Created temporary directory", map[string]interface{}{
"temp_dir": tempDir,
})
return nil
}
// collectEcShardsToWorker collects all EC shards to the current worker
func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
// Find the node with the most shards as the target
var targetNode pb.ServerAddress
maxShardCount := 0
var existingEcIndexBits erasure_coding.ShardBits
for node, shardBits := range t.sourceNodes {
shardCount := shardBits.ShardIdCount()
if shardCount > maxShardCount {
maxShardCount = shardCount
targetNode = node
existingEcIndexBits = shardBits
}
}
t.LogInfo("Selected target node for shard collection", map[string]interface{}{
"target_node": targetNode,
"existing_bits": existingEcIndexBits,
"shard_count": maxShardCount,
})
// Copy missing shards to target node
for sourceNode, shardBits := range t.sourceNodes {
if sourceNode == targetNode {
continue
}
needToCopyBits := shardBits.Minus(existingEcIndexBits)
if needToCopyBits.ShardIdCount() == 0 {
continue
}
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
t.LogInfo("Copying EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"shard_ids": needToCopyBits.ShardIds(),
"from": sourceNode,
"to": targetNode,
})
_, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToCopyBits.ToUint32Slice(),
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
})
if copyErr != nil {
return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr)
}
// Mount the copied shards
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToCopyBits.ToUint32Slice(),
})
if mountErr != nil {
return fmt.Errorf("failed to mount shards %v on %s: %w", needToCopyBits.ShardIds(), targetNode, mountErr)
}
return nil
})
if err != nil {
return "", err
}
existingEcIndexBits = existingEcIndexBits.Plus(needToCopyBits)
}
return targetNode, nil
}
// decodeEcShardsToVolume decodes EC shards into a normal volume, automatically skipping deleted entries
func (t *EcVacuumTask) decodeEcShardsToVolume(targetNode pb.ServerAddress) error {
t.LogInfo("Decoding EC shards to normal volume", map[string]interface{}{
"volume_id": t.volumeID,
"target": targetNode,
})
return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: t.volumeID,
Collection: t.collection,
})
return err
})
}
// encodeVolumeToEcShards re-encodes the cleaned volume into new EC shards
func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error {
t.LogInfo("Encoding cleaned volume to EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"target": targetNode,
})
return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: t.volumeID,
Collection: t.collection,
})
return err
})
}
// distributeNewEcShards distributes the new EC shards across the cluster
func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error {
t.LogInfo("Distributing new EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"source": sourceNode,
})
// For simplicity, we'll distribute to the same nodes as before
// In a real implementation, you might want to use topology info for better placement
// Create bit pattern for all shards (0-13)
allShardBits := erasure_coding.ShardBits(0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i))
}
for targetNode, originalShardBits := range t.sourceNodes {
if targetNode == sourceNode {
continue // Skip source node
}
// Distribute the same shards that were originally on this target
needToDistributeBits := originalShardBits
if needToDistributeBits.ShardIdCount() == 0 {
continue
}
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
t.LogInfo("Copying new EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"shard_ids": needToDistributeBits.ShardIds(),
"from": sourceNode,
"to": targetNode,
})
_, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToDistributeBits.ToUint32Slice(),
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
})
if copyErr != nil {
return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr)
}
// Mount the new shards
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToDistributeBits.ToUint32Slice(),
})
if mountErr != nil {
return fmt.Errorf("failed to mount new shards %v on %s: %w", needToDistributeBits.ShardIds(), targetNode, mountErr)
}
return nil
})
if err != nil {
return err
}
}
return nil
}
// cleanupOldEcShards removes the original volume after successful vacuum
func (t *EcVacuumTask) cleanupOldEcShards() error {
t.LogInfo("Cleaning up original volume", map[string]interface{}{
"volume_id": t.volumeID,
})
// Remove the original normal volume from the source node
for targetNode := range t.sourceNodes {
err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: t.volumeID,
})
// Ignore errors if volume doesn't exist
if err != nil {
t.LogInfo("Volume delete completed or volume not found", map[string]interface{}{
"volume_id": t.volumeID,
"node": targetNode,
"note": "This is normal if volume was already cleaned up",
})
}
return nil
})
if err != nil {
return err
}
break // Only need to delete from one node
}
return nil
}
// cleanup removes temporary files and directories
func (t *EcVacuumTask) cleanup() {
if t.tempDir != "" {
if err := os.RemoveAll(t.tempDir); err != nil {
t.LogWarning("Failed to remove temporary directory", map[string]interface{}{
"temp_dir": t.tempDir,
"error": err.Error(),
})
} else {
t.LogInfo("Cleaned up temporary directory", map[string]interface{}{
"temp_dir": t.tempDir,
})
}
}
}
// GetVolumeID returns the volume ID being processed
func (t *EcVacuumTask) GetVolumeID() uint32 {
return t.volumeID
}
// GetCollection returns the collection name
func (t *EcVacuumTask) GetCollection() string {
return t.collection
}
// SetGrpcDialOption sets the GRPC dial option for volume server communication
func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) {
t.grpcDialOption = option
}

View File

@@ -0,0 +1,145 @@
package ec_vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Scheduling determines if an EC vacuum task should be scheduled for execution
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
ecVacuumConfig, ok := config.(*Config)
if !ok {
glog.Errorf("EC vacuum scheduling: invalid config type")
return false
}
// Count running EC vacuum tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskType("ec_vacuum") {
runningCount++
}
}
// Check concurrency limit
if runningCount >= ecVacuumConfig.MaxConcurrent {
glog.V(2).Infof("EC vacuum scheduling: max concurrent limit reached (%d/%d)", runningCount, ecVacuumConfig.MaxConcurrent)
return false
}
// Check if any worker can handle EC vacuum tasks
hasCapableWorker := false
var selectedWorker *types.WorkerData
for _, worker := range availableWorkers {
if canWorkerHandleEcVacuum(worker, task) {
hasCapableWorker = true
selectedWorker = worker
break
}
}
if !hasCapableWorker {
glog.V(2).Infof("EC vacuum scheduling: no capable workers available for task %s", task.ID)
return false
}
// Check worker resource availability
if !hasEnoughResources(selectedWorker, task) {
glog.V(2).Infof("EC vacuum scheduling: worker %s doesn't have enough resources for task %s",
selectedWorker.ID, task.ID)
return false
}
// Additional checks for EC vacuum specific requirements
if !meetsEcVacuumRequirements(task, ecVacuumConfig) {
glog.V(2).Infof("EC vacuum scheduling: task %s doesn't meet EC vacuum requirements", task.ID)
return false
}
glog.V(1).Infof("EC vacuum scheduling: approved task %s for worker %s", task.ID, selectedWorker.ID)
return true
}
// canWorkerHandleEcVacuum checks if a worker can handle EC vacuum tasks
func canWorkerHandleEcVacuum(worker *types.WorkerData, task *types.TaskInput) bool {
// Check if worker has EC vacuum capability
for _, capability := range worker.Capabilities {
if capability == types.TaskType("ec_vacuum") {
return true
}
// Also accept workers with general erasure_coding capability
if capability == types.TaskType("erasure_coding") {
return true
}
}
glog.V(3).Infof("Worker %s lacks EC vacuum capability", worker.ID)
return false
}
// hasEnoughResources checks if a worker has sufficient resources for EC vacuum
func hasEnoughResources(worker *types.WorkerData, task *types.TaskInput) bool {
// Check current load using what's available in WorkerData
if worker.CurrentLoad >= 2 { // Conservative limit for EC vacuum
glog.V(3).Infof("Worker %s at capacity: load=%d", worker.ID, worker.CurrentLoad)
return false
}
// EC vacuum tasks require more resources than regular tasks
// because they involve decode/encode operations
// We'll assume workers have sufficient resources for now
// In a production system, these checks would be more sophisticated
return true
}
// meetsEcVacuumRequirements checks EC vacuum specific requirements
func meetsEcVacuumRequirements(task *types.TaskInput, config *Config) bool {
// Validate task has required parameters
if task.VolumeID == 0 {
glog.V(3).Infof("EC vacuum task %s missing volume ID", task.ID)
return false
}
// Check if this is during allowed time windows (if any restrictions)
// For now, we allow EC vacuum anytime, but this could be made configurable
// Validate collection filter if specified
if config.CollectionFilter != "" && task.Collection != config.CollectionFilter {
glog.V(3).Infof("EC vacuum task %s collection %s doesn't match filter %s",
task.ID, task.Collection, config.CollectionFilter)
return false
}
// Additional safety checks could be added here, such as:
// - Checking if volume is currently being written to
// - Verifying minimum deletion threshold is still met
// - Ensuring cluster health is good for such operations
return true
}
// GetResourceRequirements returns the resource requirements for EC vacuum tasks
func GetResourceRequirements() map[string]interface{} {
return map[string]interface{}{
"MinConcurrentSlots": 2, // Need extra slots for decode/encode
"MinDiskSpaceGB": 10, // Minimum 10GB free space
"MinMemoryMB": 1024, // 1GB memory for operations
"PreferredNetworkMbps": 100, // Good network for shard transfers
"RequiredCapabilities": []string{"ec_vacuum", "erasure_coding"},
"ConflictingTaskTypes": []string{"erasure_coding"}, // Don't run with regular EC tasks on same volume
}
}
// CalculateTaskPriority calculates priority for EC vacuum tasks
func CalculateTaskPriority(task *types.TaskInput, metrics *types.VolumeHealthMetrics) types.TaskPriority {
// Higher priority for larger volumes (more space to reclaim)
if task.VolumeID > 1000000 { // Rough size indicator
return types.TaskPriorityMedium
}
// Default priority
return types.TaskPriorityLow
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_vacuum"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
)