diff --git a/weed/mount/ml/batch_optimizer.go b/weed/mount/ml/batch_optimizer.go index d5dbfa636..12ef75947 100644 --- a/weed/mount/ml/batch_optimizer.go +++ b/weed/mount/ml/batch_optimizer.go @@ -12,155 +12,155 @@ import ( type BatchAccessPattern int const ( - BatchPatternUnknown BatchAccessPattern = iota - BatchPatternLinear // Linear batch processing - BatchPatternStrided // Strided access with fixed gaps - BatchPatternShuffled // Randomized batch order - BatchPatternHierarchical // Hierarchical/nested batch access - BatchPatternMultiGPU // Multi-GPU distributed batches - BatchPatternPipelined // Pipelined batch processing + BatchPatternUnknown BatchAccessPattern = iota + BatchPatternLinear // Linear batch processing + BatchPatternStrided // Strided access with fixed gaps + BatchPatternShuffled // Randomized batch order + BatchPatternHierarchical // Hierarchical/nested batch access + BatchPatternMultiGPU // Multi-GPU distributed batches + BatchPatternPipelined // Pipelined batch processing ) // BatchAccess represents a single file access that's part of batch processing type BatchAccess struct { - Offset int64 // File offset - Size int // Access size - AccessTime time.Time // When accessed - IsRead bool // Whether this was a read operation - BatchHint string // Optional batch identifier hint + Offset int64 // File offset + Size int // Access size + AccessTime time.Time // When accessed + IsRead bool // Whether this was a read operation + BatchHint string // Optional batch identifier hint } // BatchInfo holds information about a detected batch type BatchInfo struct { sync.RWMutex - + // Batch identification - BatchID string // Unique batch identifier - StartOffset int64 // Starting file offset - EndOffset int64 // Ending file offset - Size int64 // Total batch size in bytes - ItemCount int // Number of items in batch - ItemSize int64 // Average item size - + BatchID string // Unique batch identifier + StartOffset int64 // Starting file offset + EndOffset int64 // Ending file offset + Size int64 // Total batch size in bytes + ItemCount int // Number of items in batch + ItemSize int64 // Average item size + // Access pattern - AccessPattern BatchAccessPattern // Detected access pattern - AccessOrder []int64 // Order of access within batch - AccessTimes []time.Time // When each item was accessed - ProcessingTime time.Duration // Total time to process batch - + AccessPattern BatchAccessPattern // Detected access pattern + AccessOrder []int64 // Order of access within batch + AccessTimes []time.Time // When each item was accessed + ProcessingTime time.Duration // Total time to process batch + // Performance metrics - LoadTime time.Duration // Time to load batch from storage - ProcessTime time.Duration // Time to process batch (compute) - TotalTime time.Duration // Total end-to-end time - Throughput float64 // Items per second - + LoadTime time.Duration // Time to load batch from storage + ProcessTime time.Duration // Time to process batch (compute) + TotalTime time.Duration // Total end-to-end time + Throughput float64 // Items per second + // Optimization state - IsPrefetched bool // Whether batch was prefetched - CacheHitRate float64 // Percentage of cache hits - OptimalPrefetch int64 // Recommended prefetch size - + IsPrefetched bool // Whether batch was prefetched + CacheHitRate float64 // Percentage of cache hits + OptimalPrefetch int64 // Recommended prefetch size + // Relationship to other batches - PreviousBatch *BatchInfo // Previous batch in sequence - NextBatch *BatchInfo // Next batch in sequence - ParentBatch *BatchInfo // Parent batch (for hierarchical) - ChildBatches []*BatchInfo // Child batches (for hierarchical) + PreviousBatch *BatchInfo // Previous batch in sequence + NextBatch *BatchInfo // Next batch in sequence + ParentBatch *BatchInfo // Parent batch (for hierarchical) + ChildBatches []*BatchInfo // Child batches (for hierarchical) } // BatchOptimizer optimizes batch access patterns for ML workloads type BatchOptimizer struct { sync.RWMutex - + // Configuration - maxBatchesTracked int // Maximum number of batches to track - batchDetectionWindow int // Window size for batch detection - minBatchSize int64 // Minimum size to consider as batch - maxBatchSize int64 // Maximum size to consider as batch - + maxBatchesTracked int // Maximum number of batches to track + batchDetectionWindow int // Window size for batch detection + minBatchSize int64 // Minimum size to consider as batch + maxBatchSize int64 // Maximum size to consider as batch + // Batch tracking - activeBatches map[string]*BatchInfo // Currently active batches - completedBatches map[string]*BatchInfo // Recently completed batches - inodeToBatches map[uint64][]*BatchInfo // File to batches mapping - + activeBatches map[string]*BatchInfo // Currently active batches + completedBatches map[string]*BatchInfo // Recently completed batches + inodeToBatches map[uint64][]*BatchInfo // File to batches mapping + // Pattern detection - accessHistory map[uint64][]BatchAccess // Recent access history per file - batchSequences map[uint64]*BatchSequence // Detected batch sequences - + accessHistory map[uint64][]BatchAccess // Recent access history per file + batchSequences map[uint64]*BatchSequence // Detected batch sequences + // Optimization strategies - prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern - cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern - + prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern + cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern + // Statistics - totalBatchesDetected int64 // Total batches detected - optimizationHits int64 // Successful optimization applications - optimizationMisses int64 // Failed optimization attempts - + totalBatchesDetected int64 // Total batches detected + optimizationHits int64 // Successful optimization applications + optimizationMisses int64 // Failed optimization attempts + // Background processing - cleanupTicker *time.Ticker // Cleanup timer - stopCleanup chan struct{} // Cleanup stop signal + cleanupTicker *time.Ticker // Cleanup timer + stopCleanup chan struct{} // Cleanup stop signal } // BatchSequence represents a sequence of related batches type BatchSequence struct { sync.RWMutex - - SequenceID string // Unique sequence identifier - Batches []*BatchInfo // Batches in sequence - Pattern BatchAccessPattern // Overall sequence pattern - StartTime time.Time // When sequence started - LastAccess time.Time // Last access in sequence - IsComplete bool // Whether sequence is complete - RepeatCount int // How many times sequence has repeated - + + SequenceID string // Unique sequence identifier + Batches []*BatchInfo // Batches in sequence + Pattern BatchAccessPattern // Overall sequence pattern + StartTime time.Time // When sequence started + LastAccess time.Time // Last access in sequence + IsComplete bool // Whether sequence is complete + RepeatCount int // How many times sequence has repeated + // Predictions - NextBatchOffset int64 // Predicted next batch offset - NextBatchSize int64 // Predicted next batch size - Confidence float64 // Confidence in predictions (0-1) + NextBatchOffset int64 // Predicted next batch offset + NextBatchSize int64 // Predicted next batch size + Confidence float64 // Confidence in predictions (0-1) } // PrefetchConfig holds configuration for prefetching strategies type PrefetchConfig struct { - Strategy PrefetchStrategy // Which prefetch strategy to use - LookaheadCount int // How many items to prefetch ahead - PrefetchSize int64 // Size to prefetch per operation - ConcurrencyLevel int // How many concurrent prefetch operations - AdaptiveScaling bool // Whether to scale based on performance + Strategy PrefetchStrategy // Which prefetch strategy to use + LookaheadCount int // How many items to prefetch ahead + PrefetchSize int64 // Size to prefetch per operation + ConcurrencyLevel int // How many concurrent prefetch operations + AdaptiveScaling bool // Whether to scale based on performance } -// CacheConfig holds configuration for caching strategies +// CacheConfig holds configuration for caching strategies type CacheConfig struct { - Policy CachePolicy // Which cache policy to use - RetentionTime time.Duration // How long to keep items cached - Priority CachePriority // Cache priority level - PreloadBatches int // How many batches to preload + Policy CachePolicy // Which cache policy to use + RetentionTime time.Duration // How long to keep items cached + Priority CachePriority // Cache priority level + PreloadBatches int // How many batches to preload } // NewBatchOptimizer creates a new batch optimizer func NewBatchOptimizer() *BatchOptimizer { bo := &BatchOptimizer{ - maxBatchesTracked: 1000, // Track up to 1000 batches - batchDetectionWindow: 100, // Look at last 100 accesses - minBatchSize: 64 * 1024, // Minimum 64KB batch - maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch - - activeBatches: make(map[string]*BatchInfo), - completedBatches: make(map[string]*BatchInfo), - inodeToBatches: make(map[uint64][]*BatchInfo), - accessHistory: make(map[uint64][]BatchAccess), - batchSequences: make(map[uint64]*BatchSequence), - - prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig), - cacheStrategies: make(map[BatchAccessPattern]*CacheConfig), - - stopCleanup: make(chan struct{}), + maxBatchesTracked: 1000, // Track up to 1000 batches + batchDetectionWindow: 100, // Look at last 100 accesses + minBatchSize: 64 * 1024, // Minimum 64KB batch + maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch + + activeBatches: make(map[string]*BatchInfo), + completedBatches: make(map[string]*BatchInfo), + inodeToBatches: make(map[uint64][]*BatchInfo), + accessHistory: make(map[uint64][]BatchAccess), + batchSequences: make(map[uint64]*BatchSequence), + + prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig), + cacheStrategies: make(map[BatchAccessPattern]*CacheConfig), + + stopCleanup: make(chan struct{}), } - + // Initialize default strategies bo.initializeDefaultStrategies() - + // Start cleanup routine bo.cleanupTicker = time.NewTicker(5 * time.Minute) go bo.cleanupRoutine() - + glog.V(1).Infof("Batch optimizer initialized") return bo } @@ -169,11 +169,11 @@ func NewBatchOptimizer() *BatchOptimizer { func (bo *BatchOptimizer) initializeDefaultStrategies() { // Linear batch pattern - aggressive prefetching bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{ - Strategy: PrefetchAggressive, - LookaheadCount: 5, - PrefetchSize: 2 * 1024 * 1024, // 2MB + Strategy: PrefetchAggressive, + LookaheadCount: 5, + PrefetchSize: 2 * 1024 * 1024, // 2MB ConcurrencyLevel: 3, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{ Policy: CachePolicyTrainingAware, @@ -181,14 +181,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { Priority: CachePriorityHigh, PreloadBatches: 2, } - + // Shuffled batch pattern - conservative prefetching bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{ - Strategy: PrefetchBalanced, - LookaheadCount: 2, - PrefetchSize: 512 * 1024, // 512KB + Strategy: PrefetchBalanced, + LookaheadCount: 2, + PrefetchSize: 512 * 1024, // 512KB ConcurrencyLevel: 2, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{ Policy: CachePolicyLRU, @@ -196,14 +196,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { Priority: CachePriorityNormal, PreloadBatches: 1, } - + // Multi-GPU pattern - high concurrency bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{ - Strategy: PrefetchAggressive, - LookaheadCount: 8, - PrefetchSize: 4 * 1024 * 1024, // 4MB + Strategy: PrefetchAggressive, + LookaheadCount: 8, + PrefetchSize: 4 * 1024 * 1024, // 4MB ConcurrencyLevel: 6, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{ Policy: CachePolicyML, @@ -217,7 +217,7 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo { bo.Lock() defer bo.Unlock() - + access := BatchAccess{ Offset: offset, Size: size, @@ -225,7 +225,7 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int IsRead: isRead, BatchHint: batchHint, } - + // Add to access history history := bo.accessHistory[inode] history = append(history, access) @@ -233,23 +233,23 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int history = history[1:] // Keep only recent accesses } bo.accessHistory[inode] = history - + // Detect batch patterns batchInfo := bo.detectBatchPattern(inode, history) if batchInfo != nil { bo.totalBatchesDetected++ - + // Add to tracking bo.activeBatches[batchInfo.BatchID] = batchInfo bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo) - + // Update batch sequence bo.updateBatchSequence(inode, batchInfo) - - glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d", + + glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d", inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount) } - + return batchInfo } @@ -258,25 +258,29 @@ func (bo *BatchOptimizer) detectBatchPattern(inode uint64, history []BatchAccess if len(history) < 3 { return nil // Need minimum history } - + // Look for batch boundaries by analyzing access gaps and patterns - recent := history[len(history)-10:] // Look at last 10 accesses + startIdx := len(history) - 10 + if startIdx < 0 { + startIdx = 0 + } + recent := history[startIdx:] // Look at last 10 accesses (or all if fewer) if len(recent) < 3 { recent = history } - + // Check for batch characteristics batchInfo := bo.analyzePotentialBatch(recent, inode) if batchInfo == nil { return nil } - + // Determine access pattern batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent) - + // Calculate performance metrics bo.calculateBatchMetrics(batchInfo, recent) - + return batchInfo } @@ -285,60 +289,60 @@ func (bo *BatchOptimizer) analyzePotentialBatch(accesses []BatchAccess, inode ui if len(accesses) < 2 { return nil } - + // Calculate basic statistics var totalSize int64 var itemCount int minOffset := accesses[0].Offset maxOffset := accesses[0].Offset - + accessOrder := make([]int64, len(accesses)) accessTimes := make([]time.Time, len(accesses)) - + for i, access := range accesses { totalSize += int64(access.Size) itemCount++ - + if access.Offset < minOffset { minOffset = access.Offset } if access.Offset > maxOffset { maxOffset = access.Offset } - + accessOrder[i] = access.Offset accessTimes[i] = access.AccessTime } - + batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size) - + // Check if this qualifies as a batch if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize { return nil } - + // Check temporal locality (accesses should be close in time) timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0]) if timeSpan > 10*time.Minute { // Too spread out in time return nil } - + // Create batch info batchID := generateBatchID(inode, minOffset, time.Now()) - + batchInfo := &BatchInfo{ - BatchID: batchID, - StartOffset: minOffset, - EndOffset: maxOffset, - Size: batchSize, - ItemCount: itemCount, - ItemSize: totalSize / int64(itemCount), - AccessOrder: accessOrder, - AccessTimes: accessTimes, - TotalTime: timeSpan, - LoadTime: timeSpan, // Initially assume all time is load time + BatchID: batchID, + StartOffset: minOffset, + EndOffset: maxOffset, + Size: batchSize, + ItemCount: itemCount, + ItemSize: totalSize / int64(itemCount), + AccessOrder: accessOrder, + AccessTimes: accessTimes, + TotalTime: timeSpan, + LoadTime: timeSpan, // Initially assume all time is load time } - + return batchInfo } @@ -347,7 +351,7 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc if len(batch.AccessOrder) < 2 { return BatchPatternUnknown } - + // Check for linear pattern (sequential offsets) isLinear := true for i := 1; i < len(batch.AccessOrder); i++ { @@ -356,31 +360,31 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc break } } - + if isLinear { return BatchPatternLinear } - + // Check for strided pattern (regular gaps) if bo.isStridedPattern(batch.AccessOrder) { return BatchPatternStrided } - + // Check for shuffled pattern (randomized order) if bo.isShuffledPattern(batch.AccessOrder) { return BatchPatternShuffled } - + // Check for multi-GPU pattern (parallel access indicators) if bo.isMultiGPUPattern(accesses) { return BatchPatternMultiGPU } - + // Check for pipelined pattern (overlapping accesses) if bo.isPipelinedPattern(batch.AccessTimes) { return BatchPatternPipelined } - + return BatchPatternUnknown } @@ -389,13 +393,13 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool { if len(offsets) < 3 { return false } - + // Calculate stride stride := offsets[1] - offsets[0] if stride <= 0 { return false } - + // Check if all accesses follow the same stride consistentStrides := 0 for i := 2; i < len(offsets); i++ { @@ -404,9 +408,9 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool { consistentStrides++ } } - + // At least 80% of strides should be consistent - return float64(consistentStrides) / float64(len(offsets)-2) >= 0.8 + return float64(consistentStrides)/float64(len(offsets)-2) >= 0.8 } // isShuffledPattern checks if accesses are in randomized order @@ -414,7 +418,7 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool { if len(offsets) < 5 { return false } - + // Count inversions (out-of-order pairs) inversions := 0 for i := 0; i < len(offsets); i++ { @@ -424,10 +428,10 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool { } } } - + totalPairs := len(offsets) * (len(offsets) - 1) / 2 inversionRate := float64(inversions) / float64(totalPairs) - + // High inversion rate suggests shuffling return inversionRate > 0.3 } @@ -437,22 +441,22 @@ func (bo *BatchOptimizer) isMultiGPUPattern(accesses []BatchAccess) bool { // Look for multiple concurrent access streams // This is a simplified heuristic - in practice, this would need more // sophisticated detection based on process info, etc. - + if len(accesses) < 4 { return false } - + // Check for concurrent accesses (multiple accesses in very short time) concurrentWindows := 0 windowSize := 100 * time.Millisecond - + for i := 0; i < len(accesses)-1; i++ { timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime) if timeDiff < windowSize { concurrentWindows++ } } - + // If many accesses are concurrent, might be multi-GPU return float64(concurrentWindows)/float64(len(accesses)) > 0.5 } @@ -462,30 +466,30 @@ func (bo *BatchOptimizer) isPipelinedPattern(accessTimes []time.Time) bool { if len(accessTimes) < 3 { return false } - + // Look for regular, overlapping timing patterns intervals := make([]time.Duration, len(accessTimes)-1) for i := 1; i < len(accessTimes); i++ { intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1]) } - + // Calculate coefficient of variation for intervals var sum, sumSq time.Duration for _, interval := range intervals { sum += interval sumSq += interval * interval } - + n := time.Duration(len(intervals)) mean := sum / n if mean == 0 { return false } - + // Calculate variance and CV variance := (sumSq / n) - (mean * mean) - cv := float64(variance) / float64(mean * mean) - + cv := float64(variance) / float64(mean*mean) + // Low coefficient of variation suggests regular pipelining return cv < 0.2 } @@ -495,18 +499,18 @@ func (bo *BatchOptimizer) calculateBatchMetrics(batch *BatchInfo, accesses []Bat if len(batch.AccessTimes) < 2 { return } - + // Calculate throughput timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0]) if timeSpan > 0 { batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds() } - + // Estimate processing vs load time (heuristic) // In practice, this would need more sophisticated measurement avgItemTime := timeSpan / time.Duration(batch.ItemCount) - batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time - batch.LoadTime = avgItemTime / 2 // Assume 50% load time + batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time + batch.LoadTime = avgItemTime / 2 // Assume 50% load time } // updateBatchSequence updates the batch sequence for an inode @@ -521,26 +525,26 @@ func (bo *BatchOptimizer) updateBatchSequence(inode uint64, newBatch *BatchInfo) } bo.batchSequences[inode] = sequence } - + sequence.Lock() defer sequence.Unlock() - + // Link batches if len(sequence.Batches) > 0 { lastBatch := sequence.Batches[len(sequence.Batches)-1] lastBatch.NextBatch = newBatch newBatch.PreviousBatch = lastBatch } - + sequence.Batches = append(sequence.Batches, newBatch) sequence.LastAccess = time.Now() - + // Update sequence pattern based on majority of batches bo.updateSequencePattern(sequence) - + // Make predictions for next batch bo.updateSequencePredictions(sequence) - + // Keep sequence size manageable if len(sequence.Batches) > 100 { sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches @@ -552,13 +556,13 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) { if len(sequence.Batches) < 3 { return } - + // Count patterns patternCounts := make(map[BatchAccessPattern]int) for _, batch := range sequence.Batches { patternCounts[batch.AccessPattern]++ } - + // Find most common pattern maxCount := 0 var dominantPattern BatchAccessPattern @@ -568,7 +572,7 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) { dominantPattern = pattern } } - + sequence.Pattern = dominantPattern } @@ -577,12 +581,12 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { if len(sequence.Batches) < 2 { return } - + recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches if len(recent) < 2 { recent = sequence.Batches } - + // Predict next batch offset based on pattern switch sequence.Pattern { case BatchPatternLinear: @@ -595,7 +599,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { sequence.NextBatchSize = lastBatch.Size sequence.Confidence = 0.8 } - + case BatchPatternStrided: // Regular stride if len(recent) >= 3 { @@ -604,7 +608,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { sequence.NextBatchSize = recent[len(recent)-1].Size sequence.Confidence = 0.7 } - + default: // Lower confidence for unpredictable patterns sequence.Confidence = 0.3 @@ -615,60 +619,60 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations { bo.RLock() defer bo.RUnlock() - + sequence := bo.batchSequences[inode] if sequence == nil { return &BatchOptimizationRecommendations{ ShouldOptimize: false, } } - + sequence.RLock() defer sequence.RUnlock() - + prefetchConfig := bo.prefetchStrategies[sequence.Pattern] cacheConfig := bo.cacheStrategies[sequence.Pattern] - + if prefetchConfig == nil { prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown] } if cacheConfig == nil { cacheConfig = bo.cacheStrategies[BatchPatternUnknown] } - + recommendations := &BatchOptimizationRecommendations{ - ShouldOptimize: true, - Pattern: sequence.Pattern, - PrefetchSize: prefetchConfig.PrefetchSize, - PrefetchCount: prefetchConfig.LookaheadCount, - CachePriority: cacheConfig.Priority, - CacheRetention: cacheConfig.RetentionTime, - NextBatchOffset: sequence.NextBatchOffset, - NextBatchSize: sequence.NextBatchSize, - Confidence: sequence.Confidence, + ShouldOptimize: true, + Pattern: sequence.Pattern, + PrefetchSize: prefetchConfig.PrefetchSize, + PrefetchCount: prefetchConfig.LookaheadCount, + CachePriority: cacheConfig.Priority, + CacheRetention: cacheConfig.RetentionTime, + NextBatchOffset: sequence.NextBatchOffset, + NextBatchSize: sequence.NextBatchSize, + Confidence: sequence.Confidence, } - + return recommendations } // BatchOptimizationRecommendations holds batch optimization recommendations type BatchOptimizationRecommendations struct { - ShouldOptimize bool `json:"should_optimize"` - Pattern BatchAccessPattern `json:"pattern"` - PrefetchSize int64 `json:"prefetch_size"` - PrefetchCount int `json:"prefetch_count"` - CachePriority CachePriority `json:"cache_priority"` - CacheRetention time.Duration `json:"cache_retention"` - NextBatchOffset int64 `json:"next_batch_offset"` - NextBatchSize int64 `json:"next_batch_size"` - Confidence float64 `json:"confidence"` + ShouldOptimize bool `json:"should_optimize"` + Pattern BatchAccessPattern `json:"pattern"` + PrefetchSize int64 `json:"prefetch_size"` + PrefetchCount int `json:"prefetch_count"` + CachePriority CachePriority `json:"cache_priority"` + CacheRetention time.Duration `json:"cache_retention"` + NextBatchOffset int64 `json:"next_batch_offset"` + NextBatchSize int64 `json:"next_batch_size"` + Confidence float64 `json:"confidence"` } // GetBatchMetrics returns comprehensive batch optimization metrics func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics { bo.RLock() defer bo.RUnlock() - + metrics := BatchOptimizerMetrics{ TotalBatchesDetected: bo.totalBatchesDetected, ActiveBatches: int64(len(bo.activeBatches)), @@ -677,32 +681,32 @@ func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics { OptimizationMisses: bo.optimizationMisses, PatternCounts: make(map[BatchAccessPattern]int64), } - + // Count patterns for _, batch := range bo.activeBatches { batch.RLock() metrics.PatternCounts[batch.AccessPattern]++ batch.RUnlock() } - + // Calculate hit rate totalAttempts := bo.optimizationHits + bo.optimizationMisses if totalAttempts > 0 { metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts) } - + return metrics } // BatchOptimizerMetrics holds metrics for batch optimization type BatchOptimizerMetrics struct { - TotalBatchesDetected int64 `json:"total_batches_detected"` - ActiveBatches int64 `json:"active_batches"` - CompletedBatches int64 `json:"completed_batches"` - OptimizationHits int64 `json:"optimization_hits"` - OptimizationMisses int64 `json:"optimization_misses"` - OptimizationHitRate float64 `json:"optimization_hit_rate"` - PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"` + TotalBatchesDetected int64 `json:"total_batches_detected"` + ActiveBatches int64 `json:"active_batches"` + CompletedBatches int64 `json:"completed_batches"` + OptimizationHits int64 `json:"optimization_hits"` + OptimizationMisses int64 `json:"optimization_misses"` + OptimizationHitRate float64 `json:"optimization_hit_rate"` + PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"` } // cleanupRoutine performs periodic cleanup of old batch information @@ -721,21 +725,21 @@ func (bo *BatchOptimizer) cleanupRoutine() { func (bo *BatchOptimizer) performCleanup() { bo.Lock() defer bo.Unlock() - + now := time.Now() cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes - + // Clean up completed batches for id, batch := range bo.completedBatches { batch.RLock() shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff) batch.RUnlock() - + if shouldRemove { delete(bo.completedBatches, id) } } - + // Clean up access history for inode, history := range bo.accessHistory { filtered := make([]BatchAccess, 0, len(history)) @@ -744,14 +748,14 @@ func (bo *BatchOptimizer) performCleanup() { filtered = append(filtered, access) } } - + if len(filtered) == 0 { delete(bo.accessHistory, inode) } else { bo.accessHistory[inode] = filtered } } - + // Clean up batch sequences for inode, sequence := range bo.batchSequences { sequence.Lock() @@ -762,7 +766,7 @@ func (bo *BatchOptimizer) performCleanup() { } sequence.Unlock() } - + glog.V(4).Infof("Batch optimizer cleanup completed") } @@ -771,9 +775,9 @@ func (bo *BatchOptimizer) Shutdown() { if bo.cleanupTicker != nil { bo.cleanupTicker.Stop() } - + close(bo.stopCleanup) - + glog.V(1).Infof("Batch optimizer shutdown complete") } diff --git a/weed/mount/ml/optimization_engine_test.go b/weed/mount/ml/optimization_engine_test.go new file mode 100644 index 000000000..3e6e5bbdb --- /dev/null +++ b/weed/mount/ml/optimization_engine_test.go @@ -0,0 +1,367 @@ +package ml + +import ( + "testing" +) + +// TestOptimizationEngine_Basic tests the basic functionality of the optimization engine +func TestOptimizationEngine_Basic(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + if engine == nil { + t.Fatal("Should create optimization engine") + } + + if !engine.enabled { + t.Error("Engine should be enabled") + } + + // Check that default rules and strategies are loaded + if len(engine.rules) == 0 { + t.Error("Should have default rules loaded") + } + + if len(engine.strategies) == 0 { + t.Error("Should have default strategies loaded") + } + + t.Logf("Engine initialized with %d rules, %d strategies", len(engine.rules), len(engine.strategies)) +} + +// TestOptimizationEngine_RuleEvaluation tests rule evaluation +func TestOptimizationEngine_RuleEvaluation(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + // Create test context for sequential access of a large model file + context := &OptimizationContext{ + FilePath: "/models/large_model.pth", + FileSize: 2 * 1024 * 1024 * 1024, // 2GB + FileType: "model", + AccessPattern: SequentialAccess, + AccessFrequency: 10, + Framework: "pytorch", + WorkloadType: "training", + } + + // Apply optimizations + result := engine.OptimizeAccess(context) + + if result == nil { + t.Fatal("Should return optimization result") + } + + if !result.Applied { + t.Error("Should apply optimizations for large model file with sequential access") + } + + if result.Confidence < 0.5 { + t.Errorf("Expected confidence >= 0.5, got %.2f", result.Confidence) + } + + if len(result.Optimizations) == 0 { + t.Error("Should have applied optimizations") + } + + t.Logf("Applied %d optimizations with confidence %.2f", + len(result.Optimizations), result.Confidence) + + for i, opt := range result.Optimizations { + t.Logf("Optimization %d: type=%s, target=%s", i+1, opt.Type, opt.Target) + } +} + +// TestOptimizationEngine_FrameworkDetection tests framework detection +func TestOptimizationEngine_FrameworkDetection(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + testCases := []struct { + filePath string + expectedFramework string + }{ + {"/models/model.pth", "pytorch"}, + {"/models/model.pt", "pytorch"}, + {"/models/saved_model.pb", "tensorflow"}, + {"/models/model.h5", "tensorflow"}, + {"/models/checkpoint.ckpt", "tensorflow"}, + {"/data/dataset.tfrecord", "tensorflow"}, + {"/unknown/file.bin", ""}, + } + + for _, tc := range testCases { + framework := engine.detectFramework(tc.filePath, nil) + + if tc.expectedFramework == "" { + if framework != "" { + t.Errorf("File %s: expected no framework detection, got %s", tc.filePath, framework) + } + } else { + if framework != tc.expectedFramework { + t.Errorf("File %s: expected framework %s, got %s", + tc.filePath, tc.expectedFramework, framework) + } + } + } +} + +// TestOptimizationEngine_FileTypeDetection tests file type detection +func TestOptimizationEngine_FileTypeDetection(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + testCases := []struct { + filePath string + expectedType string + }{ + {"/models/model.pth", "model"}, + {"/data/dataset.csv", "dataset"}, + {"/configs/config.yaml", "config"}, + {"/logs/training.log", "log"}, + {"/unknown/file.bin", "unknown"}, + } + + for _, tc := range testCases { + fileType := engine.detectFileType(tc.filePath) + + if fileType != tc.expectedType { + t.Errorf("File %s: expected type %s, got %s", + tc.filePath, tc.expectedType, fileType) + } + } +} + +// TestOptimizationEngine_ConditionEvaluation tests condition evaluation +func TestOptimizationEngine_ConditionEvaluation(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + context := &OptimizationContext{ + FilePath: "/models/test.pth", + FileSize: 5 * 1024 * 1024, // 5MB + FileType: "model", + AccessPattern: SequentialAccess, + Framework: "pytorch", + } + + // Test various condition types + testConditions := []struct { + condition RuleCondition + expected bool + }{ + { + condition: RuleCondition{ + Type: "file_pattern", + Property: "extension", + Operator: "equals", + Value: ".pth", + }, + expected: true, + }, + { + condition: RuleCondition{ + Type: "file_context", + Property: "size", + Operator: "greater_than", + Value: 1024 * 1024, // 1MB + }, + expected: true, + }, + { + condition: RuleCondition{ + Type: "access_pattern", + Property: "pattern_type", + Operator: "equals", + Value: "sequential", + }, + expected: true, + }, + { + condition: RuleCondition{ + Type: "workload_context", + Property: "framework", + Operator: "equals", + Value: "tensorflow", + }, + expected: false, + }, + } + + for i, tc := range testConditions { + result := engine.evaluateCondition(tc.condition, context) + if result != tc.expected { + t.Errorf("Condition %d: expected %v, got %v", i+1, tc.expected, result) + } + } +} + +// TestOptimizationEngine_PluginSystem tests the plugin system +func TestOptimizationEngine_PluginSystem(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + // Register a test plugin + plugin := NewPyTorchPlugin() + err := engine.RegisterPlugin(plugin) + if err != nil { + t.Fatalf("Failed to register plugin: %v", err) + } + + // Verify plugin is registered + if _, exists := engine.plugins["pytorch"]; !exists { + t.Error("PyTorch plugin should be registered") + } + + // Test framework detection through plugin + confidence := plugin.DetectFramework("/models/test.pth", nil) + if confidence < 0.5 { + t.Errorf("Expected high confidence for .pth file, got %.2f", confidence) + } + + // Test optimization hints + context := &OptimizationContext{ + FilePath: "/models/test.pth", + FileSize: 100 * 1024 * 1024, // 100MB + FileType: "model", + Framework: "pytorch", + } + + hints := plugin.GetOptimizationHints(context) + if len(hints) == 0 { + t.Error("Plugin should provide optimization hints") + } + + t.Logf("Plugin provided %d optimization hints", len(hints)) +} + +// TestOptimizationEngine_UsagePatterns tests usage pattern learning +func TestOptimizationEngine_UsagePatterns(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + context := &OptimizationContext{ + FilePath: "/models/training_model.pth", + FileSize: 50 * 1024 * 1024, // 50MB + FileType: "model", + AccessPattern: SequentialAccess, + Framework: "pytorch", + WorkloadType: "training", + } + + // Apply optimization multiple times to build usage patterns + for i := 0; i < 5; i++ { + result := engine.OptimizeAccess(context) + if result == nil { + t.Fatalf("Optimization %d failed", i+1) + } + } + + // Check that usage patterns are being tracked + if len(engine.usagePatterns) == 0 { + t.Error("Should have learned usage patterns") + } + + // Verify pattern characteristics + for patternKey, pattern := range engine.usagePatterns { + t.Logf("Learned pattern: %s (frequency=%d, success_rate=%.2f)", + patternKey, pattern.Frequency, pattern.SuccessRate) + + if pattern.Frequency < 1 { + t.Errorf("Pattern %s should have frequency >= 1", patternKey) + } + } +} + +// TestOptimizationEngine_Metrics tests metrics collection +func TestOptimizationEngine_Metrics(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + metrics := engine.GetMetrics() + + if metrics == nil { + t.Fatal("Should return metrics") + } + + expectedKeys := []string{"enabled", "rules_count", "templates_count", "strategies_count"} + + for _, key := range expectedKeys { + if _, exists := metrics[key]; !exists { + t.Errorf("Metrics should contain key: %s", key) + } + } + + if metrics["enabled"] != true { + t.Error("Metrics should show engine as enabled") + } + + t.Logf("Engine metrics: %+v", metrics) +} + +// TestOptimizationEngine_ConfigurationDriven tests configuration-driven optimization +func TestOptimizationEngine_ConfigurationDriven(t *testing.T) { + engine := NewOptimizationEngine(true) + defer engine.Shutdown() + + // Test that the engine can apply optimizations based on its loaded configuration + context := &OptimizationContext{ + FilePath: "/data/dataset.csv", + FileSize: 10 * 1024 * 1024, // 10MB + FileType: "dataset", + AccessPattern: SequentialAccess, + Framework: "", + WorkloadType: "training", + BatchSize: 32, + } + + result := engine.OptimizeAccess(context) + + if result == nil { + t.Fatal("Should return optimization result") + } + + // The engine should make intelligent decisions based on context + if result.Applied && len(result.Optimizations) > 0 { + t.Logf("Successfully applied %d optimizations", len(result.Optimizations)) + + for _, opt := range result.Optimizations { + if opt.Type == "" || opt.Target == "" { + t.Error("Optimization should have valid type and target") + } + } + } + + if len(result.Recommendations) > 0 { + t.Logf("Generated %d recommendations", len(result.Recommendations)) + for _, rec := range result.Recommendations { + t.Logf("Recommendation: %s", rec) + } + } +} + +// TestOptimizationEngine_Shutdown tests proper shutdown +func TestOptimizationEngine_Shutdown(t *testing.T) { + engine := NewOptimizationEngine(true) + + if !engine.enabled { + t.Error("Engine should start enabled") + } + + engine.Shutdown() + + if engine.enabled { + t.Error("Engine should be disabled after shutdown") + } + + // Test that optimization doesn't work after shutdown + context := &OptimizationContext{ + FilePath: "/test.pth", + FileSize: 1024, + } + + result := engine.OptimizeAccess(context) + if result.Applied { + t.Error("Should not apply optimizations after shutdown") + } +} diff --git a/weed/mount/ml/phase4_integration_test.go b/weed/mount/ml/phase4_integration_test.go index af0f7231a..88e618ada 100644 --- a/weed/mount/ml/phase4_integration_test.go +++ b/weed/mount/ml/phase4_integration_test.go @@ -9,15 +9,23 @@ import ( // MockChunkCache for testing type MockChunkCache struct{} + func (m *MockChunkCache) HasChunk(fileId string, chunkOffset int64) bool { return false } -func (m *MockChunkCache) IsInCache(fileId string, forRead bool) bool { return false } -func (m *MockChunkCache) ReadChunk(fileId string, chunkOffset int64, buffer []byte) (int, error) { return 0, nil } -func (m *MockChunkCache) ReadChunkAt(buffer []byte, fileId string, offset uint64) (int, error) { return 0, nil } -func (m *MockChunkCache) WriteChunk(fileId string, chunkOffset int64, buffer []byte) error { return nil } -func (m *MockChunkCache) DeleteFileChunks(fileId string) {} -func (m *MockChunkCache) GetMetrics() interface{} { return struct{}{} } // Return empty struct -func (m *MockChunkCache) GetMaxFilePartSizeInCache() uint64 { return 64 * 1024 * 1024 } // 64MB default -func (m *MockChunkCache) Shutdown() {} +func (m *MockChunkCache) IsInCache(fileId string, forRead bool) bool { return false } +func (m *MockChunkCache) ReadChunk(fileId string, chunkOffset int64, buffer []byte) (int, error) { + return 0, nil +} +func (m *MockChunkCache) ReadChunkAt(buffer []byte, fileId string, offset uint64) (int, error) { + return 0, nil +} +func (m *MockChunkCache) WriteChunk(fileId string, chunkOffset int64, buffer []byte) error { + return nil +} +func (m *MockChunkCache) SetChunk(fileId string, buffer []byte) {} +func (m *MockChunkCache) DeleteFileChunks(fileId string) {} +func (m *MockChunkCache) GetMetrics() interface{} { return struct{}{} } // Return empty struct +func (m *MockChunkCache) GetMaxFilePartSizeInCache() uint64 { return 64 * 1024 * 1024 } // 64MB default +func (m *MockChunkCache) Shutdown() {} // MockLookupFileId for testing func MockLookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { @@ -79,9 +87,9 @@ func TestPhase4_GPUMemoryCoordinator_Basic(t *testing.T) { if coordinator == nil { t.Fatal("Should create GPU coordinator") } - + t.Log("GPU coordinator created successfully (detailed GPU operations would require actual GPU hardware)") - + // Test that it doesn't crash on basic operations t.Logf("GPU coordinator basic functionality verified") @@ -117,9 +125,9 @@ func TestPhase4_ServingOptimizer_Basic(t *testing.T) { // Test model registration (basic structure) modelInfo := &ModelServingInfo{ - ModelID: "resnet50-v1", - ModelPath: "/models/resnet50.pth", - Framework: "pytorch", + ModelID: "resnet50-v1", + ModelPath: "/models/resnet50.pth", + Framework: "pytorch", ServingPattern: ServingPatternRealtimeInference, } @@ -158,19 +166,19 @@ func TestPhase4_TensorOptimizer_Basic(t *testing.T) { func TestPhase4_MLOptimization_AdvancedIntegration(t *testing.T) { // Create ML configuration with all Phase 4 features enabled config := &MLConfig{ - PrefetchWorkers: 8, - PrefetchQueueSize: 100, - PrefetchTimeout: 30 * time.Second, - EnableMLHeuristics: true, - SequentialThreshold: 3, - ConfidenceThreshold: 0.6, - MaxPrefetchAhead: 8, - PrefetchBatchSize: 3, + PrefetchWorkers: 8, + PrefetchQueueSize: 100, + PrefetchTimeout: 30 * time.Second, + EnableMLHeuristics: true, + SequentialThreshold: 3, + ConfidenceThreshold: 0.6, + MaxPrefetchAhead: 8, + PrefetchBatchSize: 3, EnableWorkloadCoordination: true, - EnableGPUCoordination: true, - EnableDistributedTraining: true, - EnableModelServing: true, - EnableTensorOptimization: true, + EnableGPUCoordination: true, + EnableDistributedTraining: true, + EnableModelServing: true, + EnableTensorOptimization: true, } mockChunkCache := &MockChunkCache{} @@ -203,9 +211,9 @@ func TestPhase4_MLOptimization_AdvancedIntegration(t *testing.T) { // Register model for serving optimization modelInfo := &ModelServingInfo{ - ModelID: "bert-large", - ModelPath: "/models/bert-large.bin", - Framework: "transformers", + ModelID: "bert-large", + ModelPath: "/models/bert-large.bin", + Framework: "transformers", ServingPattern: ServingPatternRealtimeInference, } mlOpt.ServingOptimizer.RegisterModel(modelInfo) @@ -255,7 +263,7 @@ func TestPhase4_ConcurrentOperations(t *testing.T) { }(i) } - // Concurrent GPU coordination operations + // Concurrent GPU coordination operations for i := 0; i < numConcurrentOps; i++ { go func(index int) { defer wg.Done() @@ -283,9 +291,9 @@ func TestPhase4_ConcurrentOperations(t *testing.T) { go func(index int) { defer wg.Done() modelInfo := &ModelServingInfo{ - ModelID: "concurrent-model-" + string(rune('0'+index)), - ModelPath: "/models/model-" + string(rune('0'+index)) + ".bin", - Framework: "pytorch", + ModelID: "concurrent-model-" + string(rune('0'+index)), + ModelPath: "/models/model-" + string(rune('0'+index)) + ".bin", + Framework: "pytorch", ServingPattern: ServingPatternRealtimeInference, } mlOpt.ServingOptimizer.RegisterModel(modelInfo) @@ -324,7 +332,7 @@ func TestPhase4_ConcurrentOperations(t *testing.T) { func TestPhase4_PerformanceImpact(t *testing.T) { // Test with Phase 4 features disabled configBasic := DefaultMLConfig() - + mockChunkCache := &MockChunkCache{} startTime := time.Now() mlOptBasic := NewMLOptimization(configBasic, mockChunkCache, MockLookupFileId) @@ -346,7 +354,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) { // Performance impact should be reasonable (less than 10x slower) performanceRatio := float64(advancedInitTime) / float64(basicInitTime) - t.Logf("Basic init time: %v, Advanced init time: %v, Ratio: %.2f", + t.Logf("Basic init time: %v, Advanced init time: %v, Ratio: %.2f", basicInitTime, advancedInitTime, performanceRatio) if performanceRatio > 10.0 { @@ -357,7 +365,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) { basicMemory := estimateMemoryUsage(mlOptBasic) advancedMemory := estimateMemoryUsage(mlOptAdvanced) memoryRatio := float64(advancedMemory) / float64(basicMemory) - + t.Logf("Basic memory: %d bytes, Advanced memory: %d bytes, Ratio: %.2f", basicMemory, advancedMemory, memoryRatio) @@ -371,7 +379,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) { // Helper function to estimate memory usage (simplified) func estimateMemoryUsage(mlOpt *MLOptimization) int64 { baseSize := int64(1024 * 1024) // 1MB base - + if mlOpt.WorkloadCoordinator != nil { baseSize += 512 * 1024 // 512KB } @@ -387,7 +395,7 @@ func estimateMemoryUsage(mlOpt *MLOptimization) int64 { if mlOpt.TensorOptimizer != nil { baseSize += 256 * 1024 // 256KB } - + return baseSize } @@ -433,9 +441,9 @@ func TestPhase4_ShutdownSequence(t *testing.T) { mlOpt := NewMLOptimization(config, mockChunkCache, MockLookupFileId) // Verify all components are running - if mlOpt.WorkloadCoordinator == nil || mlOpt.GPUCoordinator == nil || - mlOpt.DistributedCoordinator == nil || mlOpt.ServingOptimizer == nil || - mlOpt.TensorOptimizer == nil { + if mlOpt.WorkloadCoordinator == nil || mlOpt.GPUCoordinator == nil || + mlOpt.DistributedCoordinator == nil || mlOpt.ServingOptimizer == nil || + mlOpt.TensorOptimizer == nil { t.Fatal("Not all Phase 4 components initialized") }