This commit is contained in:
chrislu
2025-08-30 17:08:02 -07:00
parent 814e0bb233
commit 5fe7f3fef2
3 changed files with 653 additions and 274 deletions

View File

@@ -12,155 +12,155 @@ import (
type BatchAccessPattern int type BatchAccessPattern int
const ( const (
BatchPatternUnknown BatchAccessPattern = iota BatchPatternUnknown BatchAccessPattern = iota
BatchPatternLinear // Linear batch processing BatchPatternLinear // Linear batch processing
BatchPatternStrided // Strided access with fixed gaps BatchPatternStrided // Strided access with fixed gaps
BatchPatternShuffled // Randomized batch order BatchPatternShuffled // Randomized batch order
BatchPatternHierarchical // Hierarchical/nested batch access BatchPatternHierarchical // Hierarchical/nested batch access
BatchPatternMultiGPU // Multi-GPU distributed batches BatchPatternMultiGPU // Multi-GPU distributed batches
BatchPatternPipelined // Pipelined batch processing BatchPatternPipelined // Pipelined batch processing
) )
// BatchAccess represents a single file access that's part of batch processing // BatchAccess represents a single file access that's part of batch processing
type BatchAccess struct { type BatchAccess struct {
Offset int64 // File offset Offset int64 // File offset
Size int // Access size Size int // Access size
AccessTime time.Time // When accessed AccessTime time.Time // When accessed
IsRead bool // Whether this was a read operation IsRead bool // Whether this was a read operation
BatchHint string // Optional batch identifier hint BatchHint string // Optional batch identifier hint
} }
// BatchInfo holds information about a detected batch // BatchInfo holds information about a detected batch
type BatchInfo struct { type BatchInfo struct {
sync.RWMutex sync.RWMutex
// Batch identification // Batch identification
BatchID string // Unique batch identifier BatchID string // Unique batch identifier
StartOffset int64 // Starting file offset StartOffset int64 // Starting file offset
EndOffset int64 // Ending file offset EndOffset int64 // Ending file offset
Size int64 // Total batch size in bytes Size int64 // Total batch size in bytes
ItemCount int // Number of items in batch ItemCount int // Number of items in batch
ItemSize int64 // Average item size ItemSize int64 // Average item size
// Access pattern // Access pattern
AccessPattern BatchAccessPattern // Detected access pattern AccessPattern BatchAccessPattern // Detected access pattern
AccessOrder []int64 // Order of access within batch AccessOrder []int64 // Order of access within batch
AccessTimes []time.Time // When each item was accessed AccessTimes []time.Time // When each item was accessed
ProcessingTime time.Duration // Total time to process batch ProcessingTime time.Duration // Total time to process batch
// Performance metrics // Performance metrics
LoadTime time.Duration // Time to load batch from storage LoadTime time.Duration // Time to load batch from storage
ProcessTime time.Duration // Time to process batch (compute) ProcessTime time.Duration // Time to process batch (compute)
TotalTime time.Duration // Total end-to-end time TotalTime time.Duration // Total end-to-end time
Throughput float64 // Items per second Throughput float64 // Items per second
// Optimization state // Optimization state
IsPrefetched bool // Whether batch was prefetched IsPrefetched bool // Whether batch was prefetched
CacheHitRate float64 // Percentage of cache hits CacheHitRate float64 // Percentage of cache hits
OptimalPrefetch int64 // Recommended prefetch size OptimalPrefetch int64 // Recommended prefetch size
// Relationship to other batches // Relationship to other batches
PreviousBatch *BatchInfo // Previous batch in sequence PreviousBatch *BatchInfo // Previous batch in sequence
NextBatch *BatchInfo // Next batch in sequence NextBatch *BatchInfo // Next batch in sequence
ParentBatch *BatchInfo // Parent batch (for hierarchical) ParentBatch *BatchInfo // Parent batch (for hierarchical)
ChildBatches []*BatchInfo // Child batches (for hierarchical) ChildBatches []*BatchInfo // Child batches (for hierarchical)
} }
// BatchOptimizer optimizes batch access patterns for ML workloads // BatchOptimizer optimizes batch access patterns for ML workloads
type BatchOptimizer struct { type BatchOptimizer struct {
sync.RWMutex sync.RWMutex
// Configuration // Configuration
maxBatchesTracked int // Maximum number of batches to track maxBatchesTracked int // Maximum number of batches to track
batchDetectionWindow int // Window size for batch detection batchDetectionWindow int // Window size for batch detection
minBatchSize int64 // Minimum size to consider as batch minBatchSize int64 // Minimum size to consider as batch
maxBatchSize int64 // Maximum size to consider as batch maxBatchSize int64 // Maximum size to consider as batch
// Batch tracking // Batch tracking
activeBatches map[string]*BatchInfo // Currently active batches activeBatches map[string]*BatchInfo // Currently active batches
completedBatches map[string]*BatchInfo // Recently completed batches completedBatches map[string]*BatchInfo // Recently completed batches
inodeToBatches map[uint64][]*BatchInfo // File to batches mapping inodeToBatches map[uint64][]*BatchInfo // File to batches mapping
// Pattern detection // Pattern detection
accessHistory map[uint64][]BatchAccess // Recent access history per file accessHistory map[uint64][]BatchAccess // Recent access history per file
batchSequences map[uint64]*BatchSequence // Detected batch sequences batchSequences map[uint64]*BatchSequence // Detected batch sequences
// Optimization strategies // Optimization strategies
prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern
cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern
// Statistics // Statistics
totalBatchesDetected int64 // Total batches detected totalBatchesDetected int64 // Total batches detected
optimizationHits int64 // Successful optimization applications optimizationHits int64 // Successful optimization applications
optimizationMisses int64 // Failed optimization attempts optimizationMisses int64 // Failed optimization attempts
// Background processing // Background processing
cleanupTicker *time.Ticker // Cleanup timer cleanupTicker *time.Ticker // Cleanup timer
stopCleanup chan struct{} // Cleanup stop signal stopCleanup chan struct{} // Cleanup stop signal
} }
// BatchSequence represents a sequence of related batches // BatchSequence represents a sequence of related batches
type BatchSequence struct { type BatchSequence struct {
sync.RWMutex sync.RWMutex
SequenceID string // Unique sequence identifier SequenceID string // Unique sequence identifier
Batches []*BatchInfo // Batches in sequence Batches []*BatchInfo // Batches in sequence
Pattern BatchAccessPattern // Overall sequence pattern Pattern BatchAccessPattern // Overall sequence pattern
StartTime time.Time // When sequence started StartTime time.Time // When sequence started
LastAccess time.Time // Last access in sequence LastAccess time.Time // Last access in sequence
IsComplete bool // Whether sequence is complete IsComplete bool // Whether sequence is complete
RepeatCount int // How many times sequence has repeated RepeatCount int // How many times sequence has repeated
// Predictions // Predictions
NextBatchOffset int64 // Predicted next batch offset NextBatchOffset int64 // Predicted next batch offset
NextBatchSize int64 // Predicted next batch size NextBatchSize int64 // Predicted next batch size
Confidence float64 // Confidence in predictions (0-1) Confidence float64 // Confidence in predictions (0-1)
} }
// PrefetchConfig holds configuration for prefetching strategies // PrefetchConfig holds configuration for prefetching strategies
type PrefetchConfig struct { type PrefetchConfig struct {
Strategy PrefetchStrategy // Which prefetch strategy to use Strategy PrefetchStrategy // Which prefetch strategy to use
LookaheadCount int // How many items to prefetch ahead LookaheadCount int // How many items to prefetch ahead
PrefetchSize int64 // Size to prefetch per operation PrefetchSize int64 // Size to prefetch per operation
ConcurrencyLevel int // How many concurrent prefetch operations ConcurrencyLevel int // How many concurrent prefetch operations
AdaptiveScaling bool // Whether to scale based on performance AdaptiveScaling bool // Whether to scale based on performance
} }
// CacheConfig holds configuration for caching strategies // CacheConfig holds configuration for caching strategies
type CacheConfig struct { type CacheConfig struct {
Policy CachePolicy // Which cache policy to use Policy CachePolicy // Which cache policy to use
RetentionTime time.Duration // How long to keep items cached RetentionTime time.Duration // How long to keep items cached
Priority CachePriority // Cache priority level Priority CachePriority // Cache priority level
PreloadBatches int // How many batches to preload PreloadBatches int // How many batches to preload
} }
// NewBatchOptimizer creates a new batch optimizer // NewBatchOptimizer creates a new batch optimizer
func NewBatchOptimizer() *BatchOptimizer { func NewBatchOptimizer() *BatchOptimizer {
bo := &BatchOptimizer{ bo := &BatchOptimizer{
maxBatchesTracked: 1000, // Track up to 1000 batches maxBatchesTracked: 1000, // Track up to 1000 batches
batchDetectionWindow: 100, // Look at last 100 accesses batchDetectionWindow: 100, // Look at last 100 accesses
minBatchSize: 64 * 1024, // Minimum 64KB batch minBatchSize: 64 * 1024, // Minimum 64KB batch
maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch
activeBatches: make(map[string]*BatchInfo), activeBatches: make(map[string]*BatchInfo),
completedBatches: make(map[string]*BatchInfo), completedBatches: make(map[string]*BatchInfo),
inodeToBatches: make(map[uint64][]*BatchInfo), inodeToBatches: make(map[uint64][]*BatchInfo),
accessHistory: make(map[uint64][]BatchAccess), accessHistory: make(map[uint64][]BatchAccess),
batchSequences: make(map[uint64]*BatchSequence), batchSequences: make(map[uint64]*BatchSequence),
prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig), prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig),
cacheStrategies: make(map[BatchAccessPattern]*CacheConfig), cacheStrategies: make(map[BatchAccessPattern]*CacheConfig),
stopCleanup: make(chan struct{}), stopCleanup: make(chan struct{}),
} }
// Initialize default strategies // Initialize default strategies
bo.initializeDefaultStrategies() bo.initializeDefaultStrategies()
// Start cleanup routine // Start cleanup routine
bo.cleanupTicker = time.NewTicker(5 * time.Minute) bo.cleanupTicker = time.NewTicker(5 * time.Minute)
go bo.cleanupRoutine() go bo.cleanupRoutine()
glog.V(1).Infof("Batch optimizer initialized") glog.V(1).Infof("Batch optimizer initialized")
return bo return bo
} }
@@ -169,11 +169,11 @@ func NewBatchOptimizer() *BatchOptimizer {
func (bo *BatchOptimizer) initializeDefaultStrategies() { func (bo *BatchOptimizer) initializeDefaultStrategies() {
// Linear batch pattern - aggressive prefetching // Linear batch pattern - aggressive prefetching
bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{ bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{
Strategy: PrefetchAggressive, Strategy: PrefetchAggressive,
LookaheadCount: 5, LookaheadCount: 5,
PrefetchSize: 2 * 1024 * 1024, // 2MB PrefetchSize: 2 * 1024 * 1024, // 2MB
ConcurrencyLevel: 3, ConcurrencyLevel: 3,
AdaptiveScaling: true, AdaptiveScaling: true,
} }
bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{ bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{
Policy: CachePolicyTrainingAware, Policy: CachePolicyTrainingAware,
@@ -181,14 +181,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
Priority: CachePriorityHigh, Priority: CachePriorityHigh,
PreloadBatches: 2, PreloadBatches: 2,
} }
// Shuffled batch pattern - conservative prefetching // Shuffled batch pattern - conservative prefetching
bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{ bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{
Strategy: PrefetchBalanced, Strategy: PrefetchBalanced,
LookaheadCount: 2, LookaheadCount: 2,
PrefetchSize: 512 * 1024, // 512KB PrefetchSize: 512 * 1024, // 512KB
ConcurrencyLevel: 2, ConcurrencyLevel: 2,
AdaptiveScaling: true, AdaptiveScaling: true,
} }
bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{ bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{
Policy: CachePolicyLRU, Policy: CachePolicyLRU,
@@ -196,14 +196,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
Priority: CachePriorityNormal, Priority: CachePriorityNormal,
PreloadBatches: 1, PreloadBatches: 1,
} }
// Multi-GPU pattern - high concurrency // Multi-GPU pattern - high concurrency
bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{ bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{
Strategy: PrefetchAggressive, Strategy: PrefetchAggressive,
LookaheadCount: 8, LookaheadCount: 8,
PrefetchSize: 4 * 1024 * 1024, // 4MB PrefetchSize: 4 * 1024 * 1024, // 4MB
ConcurrencyLevel: 6, ConcurrencyLevel: 6,
AdaptiveScaling: true, AdaptiveScaling: true,
} }
bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{ bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{
Policy: CachePolicyML, Policy: CachePolicyML,
@@ -217,7 +217,7 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo { func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo {
bo.Lock() bo.Lock()
defer bo.Unlock() defer bo.Unlock()
access := BatchAccess{ access := BatchAccess{
Offset: offset, Offset: offset,
Size: size, Size: size,
@@ -225,7 +225,7 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int
IsRead: isRead, IsRead: isRead,
BatchHint: batchHint, BatchHint: batchHint,
} }
// Add to access history // Add to access history
history := bo.accessHistory[inode] history := bo.accessHistory[inode]
history = append(history, access) history = append(history, access)
@@ -233,23 +233,23 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int
history = history[1:] // Keep only recent accesses history = history[1:] // Keep only recent accesses
} }
bo.accessHistory[inode] = history bo.accessHistory[inode] = history
// Detect batch patterns // Detect batch patterns
batchInfo := bo.detectBatchPattern(inode, history) batchInfo := bo.detectBatchPattern(inode, history)
if batchInfo != nil { if batchInfo != nil {
bo.totalBatchesDetected++ bo.totalBatchesDetected++
// Add to tracking // Add to tracking
bo.activeBatches[batchInfo.BatchID] = batchInfo bo.activeBatches[batchInfo.BatchID] = batchInfo
bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo) bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo)
// Update batch sequence // Update batch sequence
bo.updateBatchSequence(inode, batchInfo) bo.updateBatchSequence(inode, batchInfo)
glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d", glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d",
inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount) inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount)
} }
return batchInfo return batchInfo
} }
@@ -258,25 +258,29 @@ func (bo *BatchOptimizer) detectBatchPattern(inode uint64, history []BatchAccess
if len(history) < 3 { if len(history) < 3 {
return nil // Need minimum history return nil // Need minimum history
} }
// Look for batch boundaries by analyzing access gaps and patterns // Look for batch boundaries by analyzing access gaps and patterns
recent := history[len(history)-10:] // Look at last 10 accesses startIdx := len(history) - 10
if startIdx < 0 {
startIdx = 0
}
recent := history[startIdx:] // Look at last 10 accesses (or all if fewer)
if len(recent) < 3 { if len(recent) < 3 {
recent = history recent = history
} }
// Check for batch characteristics // Check for batch characteristics
batchInfo := bo.analyzePotentialBatch(recent, inode) batchInfo := bo.analyzePotentialBatch(recent, inode)
if batchInfo == nil { if batchInfo == nil {
return nil return nil
} }
// Determine access pattern // Determine access pattern
batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent) batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent)
// Calculate performance metrics // Calculate performance metrics
bo.calculateBatchMetrics(batchInfo, recent) bo.calculateBatchMetrics(batchInfo, recent)
return batchInfo return batchInfo
} }
@@ -285,60 +289,60 @@ func (bo *BatchOptimizer) analyzePotentialBatch(accesses []BatchAccess, inode ui
if len(accesses) < 2 { if len(accesses) < 2 {
return nil return nil
} }
// Calculate basic statistics // Calculate basic statistics
var totalSize int64 var totalSize int64
var itemCount int var itemCount int
minOffset := accesses[0].Offset minOffset := accesses[0].Offset
maxOffset := accesses[0].Offset maxOffset := accesses[0].Offset
accessOrder := make([]int64, len(accesses)) accessOrder := make([]int64, len(accesses))
accessTimes := make([]time.Time, len(accesses)) accessTimes := make([]time.Time, len(accesses))
for i, access := range accesses { for i, access := range accesses {
totalSize += int64(access.Size) totalSize += int64(access.Size)
itemCount++ itemCount++
if access.Offset < minOffset { if access.Offset < minOffset {
minOffset = access.Offset minOffset = access.Offset
} }
if access.Offset > maxOffset { if access.Offset > maxOffset {
maxOffset = access.Offset maxOffset = access.Offset
} }
accessOrder[i] = access.Offset accessOrder[i] = access.Offset
accessTimes[i] = access.AccessTime accessTimes[i] = access.AccessTime
} }
batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size) batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size)
// Check if this qualifies as a batch // Check if this qualifies as a batch
if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize { if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize {
return nil return nil
} }
// Check temporal locality (accesses should be close in time) // Check temporal locality (accesses should be close in time)
timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0]) timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0])
if timeSpan > 10*time.Minute { // Too spread out in time if timeSpan > 10*time.Minute { // Too spread out in time
return nil return nil
} }
// Create batch info // Create batch info
batchID := generateBatchID(inode, minOffset, time.Now()) batchID := generateBatchID(inode, minOffset, time.Now())
batchInfo := &BatchInfo{ batchInfo := &BatchInfo{
BatchID: batchID, BatchID: batchID,
StartOffset: minOffset, StartOffset: minOffset,
EndOffset: maxOffset, EndOffset: maxOffset,
Size: batchSize, Size: batchSize,
ItemCount: itemCount, ItemCount: itemCount,
ItemSize: totalSize / int64(itemCount), ItemSize: totalSize / int64(itemCount),
AccessOrder: accessOrder, AccessOrder: accessOrder,
AccessTimes: accessTimes, AccessTimes: accessTimes,
TotalTime: timeSpan, TotalTime: timeSpan,
LoadTime: timeSpan, // Initially assume all time is load time LoadTime: timeSpan, // Initially assume all time is load time
} }
return batchInfo return batchInfo
} }
@@ -347,7 +351,7 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc
if len(batch.AccessOrder) < 2 { if len(batch.AccessOrder) < 2 {
return BatchPatternUnknown return BatchPatternUnknown
} }
// Check for linear pattern (sequential offsets) // Check for linear pattern (sequential offsets)
isLinear := true isLinear := true
for i := 1; i < len(batch.AccessOrder); i++ { for i := 1; i < len(batch.AccessOrder); i++ {
@@ -356,31 +360,31 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc
break break
} }
} }
if isLinear { if isLinear {
return BatchPatternLinear return BatchPatternLinear
} }
// Check for strided pattern (regular gaps) // Check for strided pattern (regular gaps)
if bo.isStridedPattern(batch.AccessOrder) { if bo.isStridedPattern(batch.AccessOrder) {
return BatchPatternStrided return BatchPatternStrided
} }
// Check for shuffled pattern (randomized order) // Check for shuffled pattern (randomized order)
if bo.isShuffledPattern(batch.AccessOrder) { if bo.isShuffledPattern(batch.AccessOrder) {
return BatchPatternShuffled return BatchPatternShuffled
} }
// Check for multi-GPU pattern (parallel access indicators) // Check for multi-GPU pattern (parallel access indicators)
if bo.isMultiGPUPattern(accesses) { if bo.isMultiGPUPattern(accesses) {
return BatchPatternMultiGPU return BatchPatternMultiGPU
} }
// Check for pipelined pattern (overlapping accesses) // Check for pipelined pattern (overlapping accesses)
if bo.isPipelinedPattern(batch.AccessTimes) { if bo.isPipelinedPattern(batch.AccessTimes) {
return BatchPatternPipelined return BatchPatternPipelined
} }
return BatchPatternUnknown return BatchPatternUnknown
} }
@@ -389,13 +393,13 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool {
if len(offsets) < 3 { if len(offsets) < 3 {
return false return false
} }
// Calculate stride // Calculate stride
stride := offsets[1] - offsets[0] stride := offsets[1] - offsets[0]
if stride <= 0 { if stride <= 0 {
return false return false
} }
// Check if all accesses follow the same stride // Check if all accesses follow the same stride
consistentStrides := 0 consistentStrides := 0
for i := 2; i < len(offsets); i++ { for i := 2; i < len(offsets); i++ {
@@ -404,9 +408,9 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool {
consistentStrides++ consistentStrides++
} }
} }
// At least 80% of strides should be consistent // At least 80% of strides should be consistent
return float64(consistentStrides) / float64(len(offsets)-2) >= 0.8 return float64(consistentStrides)/float64(len(offsets)-2) >= 0.8
} }
// isShuffledPattern checks if accesses are in randomized order // isShuffledPattern checks if accesses are in randomized order
@@ -414,7 +418,7 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool {
if len(offsets) < 5 { if len(offsets) < 5 {
return false return false
} }
// Count inversions (out-of-order pairs) // Count inversions (out-of-order pairs)
inversions := 0 inversions := 0
for i := 0; i < len(offsets); i++ { for i := 0; i < len(offsets); i++ {
@@ -424,10 +428,10 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool {
} }
} }
} }
totalPairs := len(offsets) * (len(offsets) - 1) / 2 totalPairs := len(offsets) * (len(offsets) - 1) / 2
inversionRate := float64(inversions) / float64(totalPairs) inversionRate := float64(inversions) / float64(totalPairs)
// High inversion rate suggests shuffling // High inversion rate suggests shuffling
return inversionRate > 0.3 return inversionRate > 0.3
} }
@@ -437,22 +441,22 @@ func (bo *BatchOptimizer) isMultiGPUPattern(accesses []BatchAccess) bool {
// Look for multiple concurrent access streams // Look for multiple concurrent access streams
// This is a simplified heuristic - in practice, this would need more // This is a simplified heuristic - in practice, this would need more
// sophisticated detection based on process info, etc. // sophisticated detection based on process info, etc.
if len(accesses) < 4 { if len(accesses) < 4 {
return false return false
} }
// Check for concurrent accesses (multiple accesses in very short time) // Check for concurrent accesses (multiple accesses in very short time)
concurrentWindows := 0 concurrentWindows := 0
windowSize := 100 * time.Millisecond windowSize := 100 * time.Millisecond
for i := 0; i < len(accesses)-1; i++ { for i := 0; i < len(accesses)-1; i++ {
timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime) timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime)
if timeDiff < windowSize { if timeDiff < windowSize {
concurrentWindows++ concurrentWindows++
} }
} }
// If many accesses are concurrent, might be multi-GPU // If many accesses are concurrent, might be multi-GPU
return float64(concurrentWindows)/float64(len(accesses)) > 0.5 return float64(concurrentWindows)/float64(len(accesses)) > 0.5
} }
@@ -462,30 +466,30 @@ func (bo *BatchOptimizer) isPipelinedPattern(accessTimes []time.Time) bool {
if len(accessTimes) < 3 { if len(accessTimes) < 3 {
return false return false
} }
// Look for regular, overlapping timing patterns // Look for regular, overlapping timing patterns
intervals := make([]time.Duration, len(accessTimes)-1) intervals := make([]time.Duration, len(accessTimes)-1)
for i := 1; i < len(accessTimes); i++ { for i := 1; i < len(accessTimes); i++ {
intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1]) intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1])
} }
// Calculate coefficient of variation for intervals // Calculate coefficient of variation for intervals
var sum, sumSq time.Duration var sum, sumSq time.Duration
for _, interval := range intervals { for _, interval := range intervals {
sum += interval sum += interval
sumSq += interval * interval sumSq += interval * interval
} }
n := time.Duration(len(intervals)) n := time.Duration(len(intervals))
mean := sum / n mean := sum / n
if mean == 0 { if mean == 0 {
return false return false
} }
// Calculate variance and CV // Calculate variance and CV
variance := (sumSq / n) - (mean * mean) variance := (sumSq / n) - (mean * mean)
cv := float64(variance) / float64(mean * mean) cv := float64(variance) / float64(mean*mean)
// Low coefficient of variation suggests regular pipelining // Low coefficient of variation suggests regular pipelining
return cv < 0.2 return cv < 0.2
} }
@@ -495,18 +499,18 @@ func (bo *BatchOptimizer) calculateBatchMetrics(batch *BatchInfo, accesses []Bat
if len(batch.AccessTimes) < 2 { if len(batch.AccessTimes) < 2 {
return return
} }
// Calculate throughput // Calculate throughput
timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0]) timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0])
if timeSpan > 0 { if timeSpan > 0 {
batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds() batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds()
} }
// Estimate processing vs load time (heuristic) // Estimate processing vs load time (heuristic)
// In practice, this would need more sophisticated measurement // In practice, this would need more sophisticated measurement
avgItemTime := timeSpan / time.Duration(batch.ItemCount) avgItemTime := timeSpan / time.Duration(batch.ItemCount)
batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time
batch.LoadTime = avgItemTime / 2 // Assume 50% load time batch.LoadTime = avgItemTime / 2 // Assume 50% load time
} }
// updateBatchSequence updates the batch sequence for an inode // updateBatchSequence updates the batch sequence for an inode
@@ -521,26 +525,26 @@ func (bo *BatchOptimizer) updateBatchSequence(inode uint64, newBatch *BatchInfo)
} }
bo.batchSequences[inode] = sequence bo.batchSequences[inode] = sequence
} }
sequence.Lock() sequence.Lock()
defer sequence.Unlock() defer sequence.Unlock()
// Link batches // Link batches
if len(sequence.Batches) > 0 { if len(sequence.Batches) > 0 {
lastBatch := sequence.Batches[len(sequence.Batches)-1] lastBatch := sequence.Batches[len(sequence.Batches)-1]
lastBatch.NextBatch = newBatch lastBatch.NextBatch = newBatch
newBatch.PreviousBatch = lastBatch newBatch.PreviousBatch = lastBatch
} }
sequence.Batches = append(sequence.Batches, newBatch) sequence.Batches = append(sequence.Batches, newBatch)
sequence.LastAccess = time.Now() sequence.LastAccess = time.Now()
// Update sequence pattern based on majority of batches // Update sequence pattern based on majority of batches
bo.updateSequencePattern(sequence) bo.updateSequencePattern(sequence)
// Make predictions for next batch // Make predictions for next batch
bo.updateSequencePredictions(sequence) bo.updateSequencePredictions(sequence)
// Keep sequence size manageable // Keep sequence size manageable
if len(sequence.Batches) > 100 { if len(sequence.Batches) > 100 {
sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches
@@ -552,13 +556,13 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) {
if len(sequence.Batches) < 3 { if len(sequence.Batches) < 3 {
return return
} }
// Count patterns // Count patterns
patternCounts := make(map[BatchAccessPattern]int) patternCounts := make(map[BatchAccessPattern]int)
for _, batch := range sequence.Batches { for _, batch := range sequence.Batches {
patternCounts[batch.AccessPattern]++ patternCounts[batch.AccessPattern]++
} }
// Find most common pattern // Find most common pattern
maxCount := 0 maxCount := 0
var dominantPattern BatchAccessPattern var dominantPattern BatchAccessPattern
@@ -568,7 +572,7 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) {
dominantPattern = pattern dominantPattern = pattern
} }
} }
sequence.Pattern = dominantPattern sequence.Pattern = dominantPattern
} }
@@ -577,12 +581,12 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
if len(sequence.Batches) < 2 { if len(sequence.Batches) < 2 {
return return
} }
recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches
if len(recent) < 2 { if len(recent) < 2 {
recent = sequence.Batches recent = sequence.Batches
} }
// Predict next batch offset based on pattern // Predict next batch offset based on pattern
switch sequence.Pattern { switch sequence.Pattern {
case BatchPatternLinear: case BatchPatternLinear:
@@ -595,7 +599,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
sequence.NextBatchSize = lastBatch.Size sequence.NextBatchSize = lastBatch.Size
sequence.Confidence = 0.8 sequence.Confidence = 0.8
} }
case BatchPatternStrided: case BatchPatternStrided:
// Regular stride // Regular stride
if len(recent) >= 3 { if len(recent) >= 3 {
@@ -604,7 +608,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
sequence.NextBatchSize = recent[len(recent)-1].Size sequence.NextBatchSize = recent[len(recent)-1].Size
sequence.Confidence = 0.7 sequence.Confidence = 0.7
} }
default: default:
// Lower confidence for unpredictable patterns // Lower confidence for unpredictable patterns
sequence.Confidence = 0.3 sequence.Confidence = 0.3
@@ -615,60 +619,60 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations { func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations {
bo.RLock() bo.RLock()
defer bo.RUnlock() defer bo.RUnlock()
sequence := bo.batchSequences[inode] sequence := bo.batchSequences[inode]
if sequence == nil { if sequence == nil {
return &BatchOptimizationRecommendations{ return &BatchOptimizationRecommendations{
ShouldOptimize: false, ShouldOptimize: false,
} }
} }
sequence.RLock() sequence.RLock()
defer sequence.RUnlock() defer sequence.RUnlock()
prefetchConfig := bo.prefetchStrategies[sequence.Pattern] prefetchConfig := bo.prefetchStrategies[sequence.Pattern]
cacheConfig := bo.cacheStrategies[sequence.Pattern] cacheConfig := bo.cacheStrategies[sequence.Pattern]
if prefetchConfig == nil { if prefetchConfig == nil {
prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown] prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown]
} }
if cacheConfig == nil { if cacheConfig == nil {
cacheConfig = bo.cacheStrategies[BatchPatternUnknown] cacheConfig = bo.cacheStrategies[BatchPatternUnknown]
} }
recommendations := &BatchOptimizationRecommendations{ recommendations := &BatchOptimizationRecommendations{
ShouldOptimize: true, ShouldOptimize: true,
Pattern: sequence.Pattern, Pattern: sequence.Pattern,
PrefetchSize: prefetchConfig.PrefetchSize, PrefetchSize: prefetchConfig.PrefetchSize,
PrefetchCount: prefetchConfig.LookaheadCount, PrefetchCount: prefetchConfig.LookaheadCount,
CachePriority: cacheConfig.Priority, CachePriority: cacheConfig.Priority,
CacheRetention: cacheConfig.RetentionTime, CacheRetention: cacheConfig.RetentionTime,
NextBatchOffset: sequence.NextBatchOffset, NextBatchOffset: sequence.NextBatchOffset,
NextBatchSize: sequence.NextBatchSize, NextBatchSize: sequence.NextBatchSize,
Confidence: sequence.Confidence, Confidence: sequence.Confidence,
} }
return recommendations return recommendations
} }
// BatchOptimizationRecommendations holds batch optimization recommendations // BatchOptimizationRecommendations holds batch optimization recommendations
type BatchOptimizationRecommendations struct { type BatchOptimizationRecommendations struct {
ShouldOptimize bool `json:"should_optimize"` ShouldOptimize bool `json:"should_optimize"`
Pattern BatchAccessPattern `json:"pattern"` Pattern BatchAccessPattern `json:"pattern"`
PrefetchSize int64 `json:"prefetch_size"` PrefetchSize int64 `json:"prefetch_size"`
PrefetchCount int `json:"prefetch_count"` PrefetchCount int `json:"prefetch_count"`
CachePriority CachePriority `json:"cache_priority"` CachePriority CachePriority `json:"cache_priority"`
CacheRetention time.Duration `json:"cache_retention"` CacheRetention time.Duration `json:"cache_retention"`
NextBatchOffset int64 `json:"next_batch_offset"` NextBatchOffset int64 `json:"next_batch_offset"`
NextBatchSize int64 `json:"next_batch_size"` NextBatchSize int64 `json:"next_batch_size"`
Confidence float64 `json:"confidence"` Confidence float64 `json:"confidence"`
} }
// GetBatchMetrics returns comprehensive batch optimization metrics // GetBatchMetrics returns comprehensive batch optimization metrics
func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics { func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics {
bo.RLock() bo.RLock()
defer bo.RUnlock() defer bo.RUnlock()
metrics := BatchOptimizerMetrics{ metrics := BatchOptimizerMetrics{
TotalBatchesDetected: bo.totalBatchesDetected, TotalBatchesDetected: bo.totalBatchesDetected,
ActiveBatches: int64(len(bo.activeBatches)), ActiveBatches: int64(len(bo.activeBatches)),
@@ -677,32 +681,32 @@ func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics {
OptimizationMisses: bo.optimizationMisses, OptimizationMisses: bo.optimizationMisses,
PatternCounts: make(map[BatchAccessPattern]int64), PatternCounts: make(map[BatchAccessPattern]int64),
} }
// Count patterns // Count patterns
for _, batch := range bo.activeBatches { for _, batch := range bo.activeBatches {
batch.RLock() batch.RLock()
metrics.PatternCounts[batch.AccessPattern]++ metrics.PatternCounts[batch.AccessPattern]++
batch.RUnlock() batch.RUnlock()
} }
// Calculate hit rate // Calculate hit rate
totalAttempts := bo.optimizationHits + bo.optimizationMisses totalAttempts := bo.optimizationHits + bo.optimizationMisses
if totalAttempts > 0 { if totalAttempts > 0 {
metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts) metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts)
} }
return metrics return metrics
} }
// BatchOptimizerMetrics holds metrics for batch optimization // BatchOptimizerMetrics holds metrics for batch optimization
type BatchOptimizerMetrics struct { type BatchOptimizerMetrics struct {
TotalBatchesDetected int64 `json:"total_batches_detected"` TotalBatchesDetected int64 `json:"total_batches_detected"`
ActiveBatches int64 `json:"active_batches"` ActiveBatches int64 `json:"active_batches"`
CompletedBatches int64 `json:"completed_batches"` CompletedBatches int64 `json:"completed_batches"`
OptimizationHits int64 `json:"optimization_hits"` OptimizationHits int64 `json:"optimization_hits"`
OptimizationMisses int64 `json:"optimization_misses"` OptimizationMisses int64 `json:"optimization_misses"`
OptimizationHitRate float64 `json:"optimization_hit_rate"` OptimizationHitRate float64 `json:"optimization_hit_rate"`
PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"` PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"`
} }
// cleanupRoutine performs periodic cleanup of old batch information // cleanupRoutine performs periodic cleanup of old batch information
@@ -721,21 +725,21 @@ func (bo *BatchOptimizer) cleanupRoutine() {
func (bo *BatchOptimizer) performCleanup() { func (bo *BatchOptimizer) performCleanup() {
bo.Lock() bo.Lock()
defer bo.Unlock() defer bo.Unlock()
now := time.Now() now := time.Now()
cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes
// Clean up completed batches // Clean up completed batches
for id, batch := range bo.completedBatches { for id, batch := range bo.completedBatches {
batch.RLock() batch.RLock()
shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff) shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff)
batch.RUnlock() batch.RUnlock()
if shouldRemove { if shouldRemove {
delete(bo.completedBatches, id) delete(bo.completedBatches, id)
} }
} }
// Clean up access history // Clean up access history
for inode, history := range bo.accessHistory { for inode, history := range bo.accessHistory {
filtered := make([]BatchAccess, 0, len(history)) filtered := make([]BatchAccess, 0, len(history))
@@ -744,14 +748,14 @@ func (bo *BatchOptimizer) performCleanup() {
filtered = append(filtered, access) filtered = append(filtered, access)
} }
} }
if len(filtered) == 0 { if len(filtered) == 0 {
delete(bo.accessHistory, inode) delete(bo.accessHistory, inode)
} else { } else {
bo.accessHistory[inode] = filtered bo.accessHistory[inode] = filtered
} }
} }
// Clean up batch sequences // Clean up batch sequences
for inode, sequence := range bo.batchSequences { for inode, sequence := range bo.batchSequences {
sequence.Lock() sequence.Lock()
@@ -762,7 +766,7 @@ func (bo *BatchOptimizer) performCleanup() {
} }
sequence.Unlock() sequence.Unlock()
} }
glog.V(4).Infof("Batch optimizer cleanup completed") glog.V(4).Infof("Batch optimizer cleanup completed")
} }
@@ -771,9 +775,9 @@ func (bo *BatchOptimizer) Shutdown() {
if bo.cleanupTicker != nil { if bo.cleanupTicker != nil {
bo.cleanupTicker.Stop() bo.cleanupTicker.Stop()
} }
close(bo.stopCleanup) close(bo.stopCleanup)
glog.V(1).Infof("Batch optimizer shutdown complete") glog.V(1).Infof("Batch optimizer shutdown complete")
} }

View File

@@ -0,0 +1,367 @@
package ml
import (
"testing"
)
// TestOptimizationEngine_Basic tests the basic functionality of the optimization engine
func TestOptimizationEngine_Basic(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
if engine == nil {
t.Fatal("Should create optimization engine")
}
if !engine.enabled {
t.Error("Engine should be enabled")
}
// Check that default rules and strategies are loaded
if len(engine.rules) == 0 {
t.Error("Should have default rules loaded")
}
if len(engine.strategies) == 0 {
t.Error("Should have default strategies loaded")
}
t.Logf("Engine initialized with %d rules, %d strategies", len(engine.rules), len(engine.strategies))
}
// TestOptimizationEngine_RuleEvaluation tests rule evaluation
func TestOptimizationEngine_RuleEvaluation(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
// Create test context for sequential access of a large model file
context := &OptimizationContext{
FilePath: "/models/large_model.pth",
FileSize: 2 * 1024 * 1024 * 1024, // 2GB
FileType: "model",
AccessPattern: SequentialAccess,
AccessFrequency: 10,
Framework: "pytorch",
WorkloadType: "training",
}
// Apply optimizations
result := engine.OptimizeAccess(context)
if result == nil {
t.Fatal("Should return optimization result")
}
if !result.Applied {
t.Error("Should apply optimizations for large model file with sequential access")
}
if result.Confidence < 0.5 {
t.Errorf("Expected confidence >= 0.5, got %.2f", result.Confidence)
}
if len(result.Optimizations) == 0 {
t.Error("Should have applied optimizations")
}
t.Logf("Applied %d optimizations with confidence %.2f",
len(result.Optimizations), result.Confidence)
for i, opt := range result.Optimizations {
t.Logf("Optimization %d: type=%s, target=%s", i+1, opt.Type, opt.Target)
}
}
// TestOptimizationEngine_FrameworkDetection tests framework detection
func TestOptimizationEngine_FrameworkDetection(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
testCases := []struct {
filePath string
expectedFramework string
}{
{"/models/model.pth", "pytorch"},
{"/models/model.pt", "pytorch"},
{"/models/saved_model.pb", "tensorflow"},
{"/models/model.h5", "tensorflow"},
{"/models/checkpoint.ckpt", "tensorflow"},
{"/data/dataset.tfrecord", "tensorflow"},
{"/unknown/file.bin", ""},
}
for _, tc := range testCases {
framework := engine.detectFramework(tc.filePath, nil)
if tc.expectedFramework == "" {
if framework != "" {
t.Errorf("File %s: expected no framework detection, got %s", tc.filePath, framework)
}
} else {
if framework != tc.expectedFramework {
t.Errorf("File %s: expected framework %s, got %s",
tc.filePath, tc.expectedFramework, framework)
}
}
}
}
// TestOptimizationEngine_FileTypeDetection tests file type detection
func TestOptimizationEngine_FileTypeDetection(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
testCases := []struct {
filePath string
expectedType string
}{
{"/models/model.pth", "model"},
{"/data/dataset.csv", "dataset"},
{"/configs/config.yaml", "config"},
{"/logs/training.log", "log"},
{"/unknown/file.bin", "unknown"},
}
for _, tc := range testCases {
fileType := engine.detectFileType(tc.filePath)
if fileType != tc.expectedType {
t.Errorf("File %s: expected type %s, got %s",
tc.filePath, tc.expectedType, fileType)
}
}
}
// TestOptimizationEngine_ConditionEvaluation tests condition evaluation
func TestOptimizationEngine_ConditionEvaluation(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
context := &OptimizationContext{
FilePath: "/models/test.pth",
FileSize: 5 * 1024 * 1024, // 5MB
FileType: "model",
AccessPattern: SequentialAccess,
Framework: "pytorch",
}
// Test various condition types
testConditions := []struct {
condition RuleCondition
expected bool
}{
{
condition: RuleCondition{
Type: "file_pattern",
Property: "extension",
Operator: "equals",
Value: ".pth",
},
expected: true,
},
{
condition: RuleCondition{
Type: "file_context",
Property: "size",
Operator: "greater_than",
Value: 1024 * 1024, // 1MB
},
expected: true,
},
{
condition: RuleCondition{
Type: "access_pattern",
Property: "pattern_type",
Operator: "equals",
Value: "sequential",
},
expected: true,
},
{
condition: RuleCondition{
Type: "workload_context",
Property: "framework",
Operator: "equals",
Value: "tensorflow",
},
expected: false,
},
}
for i, tc := range testConditions {
result := engine.evaluateCondition(tc.condition, context)
if result != tc.expected {
t.Errorf("Condition %d: expected %v, got %v", i+1, tc.expected, result)
}
}
}
// TestOptimizationEngine_PluginSystem tests the plugin system
func TestOptimizationEngine_PluginSystem(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
// Register a test plugin
plugin := NewPyTorchPlugin()
err := engine.RegisterPlugin(plugin)
if err != nil {
t.Fatalf("Failed to register plugin: %v", err)
}
// Verify plugin is registered
if _, exists := engine.plugins["pytorch"]; !exists {
t.Error("PyTorch plugin should be registered")
}
// Test framework detection through plugin
confidence := plugin.DetectFramework("/models/test.pth", nil)
if confidence < 0.5 {
t.Errorf("Expected high confidence for .pth file, got %.2f", confidence)
}
// Test optimization hints
context := &OptimizationContext{
FilePath: "/models/test.pth",
FileSize: 100 * 1024 * 1024, // 100MB
FileType: "model",
Framework: "pytorch",
}
hints := plugin.GetOptimizationHints(context)
if len(hints) == 0 {
t.Error("Plugin should provide optimization hints")
}
t.Logf("Plugin provided %d optimization hints", len(hints))
}
// TestOptimizationEngine_UsagePatterns tests usage pattern learning
func TestOptimizationEngine_UsagePatterns(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
context := &OptimizationContext{
FilePath: "/models/training_model.pth",
FileSize: 50 * 1024 * 1024, // 50MB
FileType: "model",
AccessPattern: SequentialAccess,
Framework: "pytorch",
WorkloadType: "training",
}
// Apply optimization multiple times to build usage patterns
for i := 0; i < 5; i++ {
result := engine.OptimizeAccess(context)
if result == nil {
t.Fatalf("Optimization %d failed", i+1)
}
}
// Check that usage patterns are being tracked
if len(engine.usagePatterns) == 0 {
t.Error("Should have learned usage patterns")
}
// Verify pattern characteristics
for patternKey, pattern := range engine.usagePatterns {
t.Logf("Learned pattern: %s (frequency=%d, success_rate=%.2f)",
patternKey, pattern.Frequency, pattern.SuccessRate)
if pattern.Frequency < 1 {
t.Errorf("Pattern %s should have frequency >= 1", patternKey)
}
}
}
// TestOptimizationEngine_Metrics tests metrics collection
func TestOptimizationEngine_Metrics(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
metrics := engine.GetMetrics()
if metrics == nil {
t.Fatal("Should return metrics")
}
expectedKeys := []string{"enabled", "rules_count", "templates_count", "strategies_count"}
for _, key := range expectedKeys {
if _, exists := metrics[key]; !exists {
t.Errorf("Metrics should contain key: %s", key)
}
}
if metrics["enabled"] != true {
t.Error("Metrics should show engine as enabled")
}
t.Logf("Engine metrics: %+v", metrics)
}
// TestOptimizationEngine_ConfigurationDriven tests configuration-driven optimization
func TestOptimizationEngine_ConfigurationDriven(t *testing.T) {
engine := NewOptimizationEngine(true)
defer engine.Shutdown()
// Test that the engine can apply optimizations based on its loaded configuration
context := &OptimizationContext{
FilePath: "/data/dataset.csv",
FileSize: 10 * 1024 * 1024, // 10MB
FileType: "dataset",
AccessPattern: SequentialAccess,
Framework: "",
WorkloadType: "training",
BatchSize: 32,
}
result := engine.OptimizeAccess(context)
if result == nil {
t.Fatal("Should return optimization result")
}
// The engine should make intelligent decisions based on context
if result.Applied && len(result.Optimizations) > 0 {
t.Logf("Successfully applied %d optimizations", len(result.Optimizations))
for _, opt := range result.Optimizations {
if opt.Type == "" || opt.Target == "" {
t.Error("Optimization should have valid type and target")
}
}
}
if len(result.Recommendations) > 0 {
t.Logf("Generated %d recommendations", len(result.Recommendations))
for _, rec := range result.Recommendations {
t.Logf("Recommendation: %s", rec)
}
}
}
// TestOptimizationEngine_Shutdown tests proper shutdown
func TestOptimizationEngine_Shutdown(t *testing.T) {
engine := NewOptimizationEngine(true)
if !engine.enabled {
t.Error("Engine should start enabled")
}
engine.Shutdown()
if engine.enabled {
t.Error("Engine should be disabled after shutdown")
}
// Test that optimization doesn't work after shutdown
context := &OptimizationContext{
FilePath: "/test.pth",
FileSize: 1024,
}
result := engine.OptimizeAccess(context)
if result.Applied {
t.Error("Should not apply optimizations after shutdown")
}
}

View File

@@ -9,15 +9,23 @@ import (
// MockChunkCache for testing // MockChunkCache for testing
type MockChunkCache struct{} type MockChunkCache struct{}
func (m *MockChunkCache) HasChunk(fileId string, chunkOffset int64) bool { return false } func (m *MockChunkCache) HasChunk(fileId string, chunkOffset int64) bool { return false }
func (m *MockChunkCache) IsInCache(fileId string, forRead bool) bool { return false } func (m *MockChunkCache) IsInCache(fileId string, forRead bool) bool { return false }
func (m *MockChunkCache) ReadChunk(fileId string, chunkOffset int64, buffer []byte) (int, error) { return 0, nil } func (m *MockChunkCache) ReadChunk(fileId string, chunkOffset int64, buffer []byte) (int, error) {
func (m *MockChunkCache) ReadChunkAt(buffer []byte, fileId string, offset uint64) (int, error) { return 0, nil } return 0, nil
func (m *MockChunkCache) WriteChunk(fileId string, chunkOffset int64, buffer []byte) error { return nil } }
func (m *MockChunkCache) DeleteFileChunks(fileId string) {} func (m *MockChunkCache) ReadChunkAt(buffer []byte, fileId string, offset uint64) (int, error) {
func (m *MockChunkCache) GetMetrics() interface{} { return struct{}{} } // Return empty struct return 0, nil
func (m *MockChunkCache) GetMaxFilePartSizeInCache() uint64 { return 64 * 1024 * 1024 } // 64MB default }
func (m *MockChunkCache) Shutdown() {} func (m *MockChunkCache) WriteChunk(fileId string, chunkOffset int64, buffer []byte) error {
return nil
}
func (m *MockChunkCache) SetChunk(fileId string, buffer []byte) {}
func (m *MockChunkCache) DeleteFileChunks(fileId string) {}
func (m *MockChunkCache) GetMetrics() interface{} { return struct{}{} } // Return empty struct
func (m *MockChunkCache) GetMaxFilePartSizeInCache() uint64 { return 64 * 1024 * 1024 } // 64MB default
func (m *MockChunkCache) Shutdown() {}
// MockLookupFileId for testing // MockLookupFileId for testing
func MockLookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { func MockLookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
@@ -79,9 +87,9 @@ func TestPhase4_GPUMemoryCoordinator_Basic(t *testing.T) {
if coordinator == nil { if coordinator == nil {
t.Fatal("Should create GPU coordinator") t.Fatal("Should create GPU coordinator")
} }
t.Log("GPU coordinator created successfully (detailed GPU operations would require actual GPU hardware)") t.Log("GPU coordinator created successfully (detailed GPU operations would require actual GPU hardware)")
// Test that it doesn't crash on basic operations // Test that it doesn't crash on basic operations
t.Logf("GPU coordinator basic functionality verified") t.Logf("GPU coordinator basic functionality verified")
@@ -117,9 +125,9 @@ func TestPhase4_ServingOptimizer_Basic(t *testing.T) {
// Test model registration (basic structure) // Test model registration (basic structure)
modelInfo := &ModelServingInfo{ modelInfo := &ModelServingInfo{
ModelID: "resnet50-v1", ModelID: "resnet50-v1",
ModelPath: "/models/resnet50.pth", ModelPath: "/models/resnet50.pth",
Framework: "pytorch", Framework: "pytorch",
ServingPattern: ServingPatternRealtimeInference, ServingPattern: ServingPatternRealtimeInference,
} }
@@ -158,19 +166,19 @@ func TestPhase4_TensorOptimizer_Basic(t *testing.T) {
func TestPhase4_MLOptimization_AdvancedIntegration(t *testing.T) { func TestPhase4_MLOptimization_AdvancedIntegration(t *testing.T) {
// Create ML configuration with all Phase 4 features enabled // Create ML configuration with all Phase 4 features enabled
config := &MLConfig{ config := &MLConfig{
PrefetchWorkers: 8, PrefetchWorkers: 8,
PrefetchQueueSize: 100, PrefetchQueueSize: 100,
PrefetchTimeout: 30 * time.Second, PrefetchTimeout: 30 * time.Second,
EnableMLHeuristics: true, EnableMLHeuristics: true,
SequentialThreshold: 3, SequentialThreshold: 3,
ConfidenceThreshold: 0.6, ConfidenceThreshold: 0.6,
MaxPrefetchAhead: 8, MaxPrefetchAhead: 8,
PrefetchBatchSize: 3, PrefetchBatchSize: 3,
EnableWorkloadCoordination: true, EnableWorkloadCoordination: true,
EnableGPUCoordination: true, EnableGPUCoordination: true,
EnableDistributedTraining: true, EnableDistributedTraining: true,
EnableModelServing: true, EnableModelServing: true,
EnableTensorOptimization: true, EnableTensorOptimization: true,
} }
mockChunkCache := &MockChunkCache{} mockChunkCache := &MockChunkCache{}
@@ -203,9 +211,9 @@ func TestPhase4_MLOptimization_AdvancedIntegration(t *testing.T) {
// Register model for serving optimization // Register model for serving optimization
modelInfo := &ModelServingInfo{ modelInfo := &ModelServingInfo{
ModelID: "bert-large", ModelID: "bert-large",
ModelPath: "/models/bert-large.bin", ModelPath: "/models/bert-large.bin",
Framework: "transformers", Framework: "transformers",
ServingPattern: ServingPatternRealtimeInference, ServingPattern: ServingPatternRealtimeInference,
} }
mlOpt.ServingOptimizer.RegisterModel(modelInfo) mlOpt.ServingOptimizer.RegisterModel(modelInfo)
@@ -255,7 +263,7 @@ func TestPhase4_ConcurrentOperations(t *testing.T) {
}(i) }(i)
} }
// Concurrent GPU coordination operations // Concurrent GPU coordination operations
for i := 0; i < numConcurrentOps; i++ { for i := 0; i < numConcurrentOps; i++ {
go func(index int) { go func(index int) {
defer wg.Done() defer wg.Done()
@@ -283,9 +291,9 @@ func TestPhase4_ConcurrentOperations(t *testing.T) {
go func(index int) { go func(index int) {
defer wg.Done() defer wg.Done()
modelInfo := &ModelServingInfo{ modelInfo := &ModelServingInfo{
ModelID: "concurrent-model-" + string(rune('0'+index)), ModelID: "concurrent-model-" + string(rune('0'+index)),
ModelPath: "/models/model-" + string(rune('0'+index)) + ".bin", ModelPath: "/models/model-" + string(rune('0'+index)) + ".bin",
Framework: "pytorch", Framework: "pytorch",
ServingPattern: ServingPatternRealtimeInference, ServingPattern: ServingPatternRealtimeInference,
} }
mlOpt.ServingOptimizer.RegisterModel(modelInfo) mlOpt.ServingOptimizer.RegisterModel(modelInfo)
@@ -324,7 +332,7 @@ func TestPhase4_ConcurrentOperations(t *testing.T) {
func TestPhase4_PerformanceImpact(t *testing.T) { func TestPhase4_PerformanceImpact(t *testing.T) {
// Test with Phase 4 features disabled // Test with Phase 4 features disabled
configBasic := DefaultMLConfig() configBasic := DefaultMLConfig()
mockChunkCache := &MockChunkCache{} mockChunkCache := &MockChunkCache{}
startTime := time.Now() startTime := time.Now()
mlOptBasic := NewMLOptimization(configBasic, mockChunkCache, MockLookupFileId) mlOptBasic := NewMLOptimization(configBasic, mockChunkCache, MockLookupFileId)
@@ -346,7 +354,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) {
// Performance impact should be reasonable (less than 10x slower) // Performance impact should be reasonable (less than 10x slower)
performanceRatio := float64(advancedInitTime) / float64(basicInitTime) performanceRatio := float64(advancedInitTime) / float64(basicInitTime)
t.Logf("Basic init time: %v, Advanced init time: %v, Ratio: %.2f", t.Logf("Basic init time: %v, Advanced init time: %v, Ratio: %.2f",
basicInitTime, advancedInitTime, performanceRatio) basicInitTime, advancedInitTime, performanceRatio)
if performanceRatio > 10.0 { if performanceRatio > 10.0 {
@@ -357,7 +365,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) {
basicMemory := estimateMemoryUsage(mlOptBasic) basicMemory := estimateMemoryUsage(mlOptBasic)
advancedMemory := estimateMemoryUsage(mlOptAdvanced) advancedMemory := estimateMemoryUsage(mlOptAdvanced)
memoryRatio := float64(advancedMemory) / float64(basicMemory) memoryRatio := float64(advancedMemory) / float64(basicMemory)
t.Logf("Basic memory: %d bytes, Advanced memory: %d bytes, Ratio: %.2f", t.Logf("Basic memory: %d bytes, Advanced memory: %d bytes, Ratio: %.2f",
basicMemory, advancedMemory, memoryRatio) basicMemory, advancedMemory, memoryRatio)
@@ -371,7 +379,7 @@ func TestPhase4_PerformanceImpact(t *testing.T) {
// Helper function to estimate memory usage (simplified) // Helper function to estimate memory usage (simplified)
func estimateMemoryUsage(mlOpt *MLOptimization) int64 { func estimateMemoryUsage(mlOpt *MLOptimization) int64 {
baseSize := int64(1024 * 1024) // 1MB base baseSize := int64(1024 * 1024) // 1MB base
if mlOpt.WorkloadCoordinator != nil { if mlOpt.WorkloadCoordinator != nil {
baseSize += 512 * 1024 // 512KB baseSize += 512 * 1024 // 512KB
} }
@@ -387,7 +395,7 @@ func estimateMemoryUsage(mlOpt *MLOptimization) int64 {
if mlOpt.TensorOptimizer != nil { if mlOpt.TensorOptimizer != nil {
baseSize += 256 * 1024 // 256KB baseSize += 256 * 1024 // 256KB
} }
return baseSize return baseSize
} }
@@ -433,9 +441,9 @@ func TestPhase4_ShutdownSequence(t *testing.T) {
mlOpt := NewMLOptimization(config, mockChunkCache, MockLookupFileId) mlOpt := NewMLOptimization(config, mockChunkCache, MockLookupFileId)
// Verify all components are running // Verify all components are running
if mlOpt.WorkloadCoordinator == nil || mlOpt.GPUCoordinator == nil || if mlOpt.WorkloadCoordinator == nil || mlOpt.GPUCoordinator == nil ||
mlOpt.DistributedCoordinator == nil || mlOpt.ServingOptimizer == nil || mlOpt.DistributedCoordinator == nil || mlOpt.ServingOptimizer == nil ||
mlOpt.TensorOptimizer == nil { mlOpt.TensorOptimizer == nil {
t.Fatal("Not all Phase 4 components initialized") t.Fatal("Not all Phase 4 components initialized")
} }