diff --git a/weed/mount/ml/cache_policy.go b/weed/mount/ml/cache_policy.go new file mode 100644 index 000000000..256b36dc9 --- /dev/null +++ b/weed/mount/ml/cache_policy.go @@ -0,0 +1,313 @@ +package ml + +import ( + "math" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// CacheEntry represents a cached item with ML-aware metadata +type CacheEntry struct { + Inode uint64 // File inode + Size uint64 // Size of cached data + LastAccess time.Time // Last access time + AccessCount int64 // Total access count + CacheLevel int // Cache level (0=memory, 1=disk, etc.) + Pattern AccessPattern // Detected access pattern + FileType MLFileType // Type of ML file + IsHot bool // Whether this is a hot chunk + + // ML-specific metadata + IsTrainingData bool // Whether this is training data + IsModel bool // Whether this is a model file + PredictedReuse float64 // Predicted reuse probability (0.0-1.0) + EpochRelevance float64 // Relevance for current training epoch +} + +// MLCachePolicy implements ML-aware cache eviction policy +type MLCachePolicy struct { + // Weights for different factors (sum should be 1.0) + accessFrequencyWeight float64 // Weight for access frequency + recencyWeight float64 // Weight for access recency + sizeWeight float64 // Weight for item size + mlWeight float64 // Weight for ML-specific factors + + // ML-specific parameters + trainingDataBoost float64 // Boost factor for training data + modelFileBoost float64 // Boost factor for model files + sequentialBoost float64 // Boost factor for sequential access + epochRelevanceBoost float64 // Boost factor for epoch-relevant data + + // Time-based parameters + hotThreshold time.Duration // Threshold for considering item "hot" + coldThreshold time.Duration // Threshold for considering item "cold" + + // Size-based parameters + largeFileThreshold uint64 // Threshold for large files + smallFilePreference float64 // Preference for keeping small files + + // Statistics + totalEvictions int64 + mlFileEvictions int64 + trainingDataEvictions int64 + modelFileEvictions int64 +} + +// NewMLCachePolicy creates a new ML-aware cache eviction policy +func NewMLCachePolicy() *MLCachePolicy { + return &MLCachePolicy{ + // Balanced weights + accessFrequencyWeight: 0.3, + recencyWeight: 0.3, + sizeWeight: 0.2, + mlWeight: 0.2, + + // ML-specific boosts + trainingDataBoost: 1.5, // 50% boost for training data + modelFileBoost: 2.0, // 100% boost for model files + sequentialBoost: 1.3, // 30% boost for sequential access + epochRelevanceBoost: 1.4, // 40% boost for epoch-relevant data + + // Time thresholds + hotThreshold: 1 * time.Minute, + coldThreshold: 10 * time.Minute, + + // Size parameters + largeFileThreshold: 10 * 1024 * 1024, // 10MB + smallFilePreference: 1.2, // 20% preference for small files + } +} + +// CalculateEvictionScore calculates an eviction score for a cache entry +// Lower scores indicate higher priority for eviction +func (policy *MLCachePolicy) CalculateEvictionScore(entry *CacheEntry) float64 { + now := time.Now() + timeSinceAccess := now.Sub(entry.LastAccess) + + // Base factors + accessFrequencyScore := policy.calculateAccessFrequencyScore(entry) + recencyScore := policy.calculateRecencyScore(timeSinceAccess) + sizeScore := policy.calculateSizeScore(entry.Size) + mlScore := policy.calculateMLScore(entry) + + // Weighted combination + totalScore := policy.accessFrequencyWeight*accessFrequencyScore + + policy.recencyWeight*recencyScore + + policy.sizeWeight*sizeScore + + policy.mlWeight*mlScore + + glog.V(4).Infof("Eviction score for inode=%d: total=%.3f (freq=%.3f, recency=%.3f, size=%.3f, ml=%.3f)", + entry.Inode, totalScore, accessFrequencyScore, recencyScore, sizeScore, mlScore) + + return totalScore +} + +// ShouldEvict determines if a cache entry should be evicted +func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool { + score := policy.CalculateEvictionScore(entry) + + // Different thresholds based on ML file type + threshold := 0.3 // Default threshold + + switch entry.FileType { + case MLFileModel: + threshold = 0.1 // Very low threshold - keep models cached longer + case MLFileDataset: + if entry.Pattern == SequentialAccess || entry.Pattern == EpochAccess { + threshold = 0.2 // Lower threshold for sequential dataset access + } else { + threshold = 0.4 // Higher threshold for random dataset access + } + case MLFileTensor: + threshold = 0.25 // Medium threshold for tensor files + case MLFileConfig: + threshold = 0.5 // Higher threshold for config files (less critical) + default: + threshold = 0.3 // Default for unknown files + } + + shouldEvict := score < threshold + + if shouldEvict { + policy.totalEvictions++ + if entry.IsTrainingData { + policy.trainingDataEvictions++ + } + if entry.IsModel { + policy.modelFileEvictions++ + } + if entry.FileType != MLFileUnknown { + policy.mlFileEvictions++ + } + + glog.V(4).Infof("Evicting: inode=%d, score=%.3f < threshold=%.3f, type=%v", + entry.Inode, score, threshold, entry.FileType) + } + + return shouldEvict +} + +// calculateAccessFrequencyScore calculates score based on access frequency +func (policy *MLCachePolicy) calculateAccessFrequencyScore(entry *CacheEntry) float64 { + if entry.AccessCount == 0 { + return 0.0 + } + + // Logarithmic scaling for access count + base := math.Log(float64(entry.AccessCount) + 1) + + // Apply ML-specific boosts + boost := 1.0 + if entry.IsTrainingData { + boost *= policy.trainingDataBoost + } + if entry.IsModel { + boost *= policy.modelFileBoost + } + if entry.Pattern == SequentialAccess { + boost *= policy.sequentialBoost + } + if entry.EpochRelevance > 0.5 { + boost *= policy.epochRelevanceBoost + } + + return base * boost +} + +// calculateRecencyScore calculates score based on access recency +func (policy *MLCachePolicy) calculateRecencyScore(timeSinceAccess time.Duration) float64 { + if timeSinceAccess <= policy.hotThreshold { + return 1.0 // Very recent access + } + + if timeSinceAccess >= policy.coldThreshold { + return 0.1 // Very old access + } + + // Linear decay between hot and cold thresholds + ratio := float64(timeSinceAccess-policy.hotThreshold) / float64(policy.coldThreshold-policy.hotThreshold) + return 1.0 - ratio*0.9 // Decay from 1.0 to 0.1 +} + +// calculateSizeScore calculates score based on item size +func (policy *MLCachePolicy) calculateSizeScore(size uint64) float64 { + if size < policy.largeFileThreshold { + // Prefer keeping smaller files (higher score) + return policy.smallFilePreference + } + + // Larger files get lower score (more likely to be evicted) + // But not too low since they might be important model files + ratio := float64(size) / float64(policy.largeFileThreshold) + return math.Max(0.3, 1.0/math.Sqrt(ratio)) +} + +// calculateMLScore calculates ML-specific factors +func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 { + score := 0.5 // Base score for non-ML files + + // File type bonuses + switch entry.FileType { + case MLFileModel: + score = 1.0 // Highest priority for model files + case MLFileDataset: + score = 0.8 // High priority for datasets + case MLFileTensor: + score = 0.7 // Good priority for tensor files + case MLFileConfig: + score = 0.4 // Lower priority for config files + case MLFileLog: + score = 0.3 // Lowest priority for log files + default: + score = 0.5 // Default for unknown files + } + + // Access pattern bonuses + switch entry.Pattern { + case SequentialAccess: + score *= 1.2 // Boost for sequential access + case ModelAccess: + score *= 1.5 // Strong boost for model access + case EpochAccess: + score *= 1.3 // Boost for epoch access + case BatchAccess: + score *= 1.1 // Small boost for batch access + } + + // Predicted reuse bonus + if entry.PredictedReuse > 0.7 { + score *= 1.2 // Boost for high predicted reuse + } + + // Epoch relevance bonus + if entry.EpochRelevance > 0.5 { + score *= (1.0 + entry.EpochRelevance*0.3) // Up to 30% boost for epoch relevance + } + + // Hot chunk bonus + if entry.IsHot { + score *= 1.1 + } + + return score +} + +// GetEvictionMetrics returns eviction policy metrics +func (policy *MLCachePolicy) GetEvictionMetrics() MLCachePolicyMetrics { + return MLCachePolicyMetrics{ + TotalEvictions: policy.totalEvictions, + MLFileEvictions: policy.mlFileEvictions, + TrainingDataEvictions: policy.trainingDataEvictions, + ModelFileEvictions: policy.modelFileEvictions, + + // Configuration + AccessFrequencyWeight: policy.accessFrequencyWeight, + RecencyWeight: policy.recencyWeight, + SizeWeight: policy.sizeWeight, + MLWeight: policy.mlWeight, + } +} + +// MLCachePolicyMetrics holds metrics for the ML cache policy +type MLCachePolicyMetrics struct { + TotalEvictions int64 `json:"total_evictions"` + MLFileEvictions int64 `json:"ml_file_evictions"` + TrainingDataEvictions int64 `json:"training_data_evictions"` + ModelFileEvictions int64 `json:"model_file_evictions"` + + // Configuration weights + AccessFrequencyWeight float64 `json:"access_frequency_weight"` + RecencyWeight float64 `json:"recency_weight"` + SizeWeight float64 `json:"size_weight"` + MLWeight float64 `json:"ml_weight"` +} + +// SetWeights updates the eviction policy weights +func (policy *MLCachePolicy) SetWeights(frequency, recency, size, ml float64) { + total := frequency + recency + size + ml + if total == 0 { + glog.Warningf("Invalid weights provided, using defaults") + return + } + + // Normalize weights to sum to 1.0 + policy.accessFrequencyWeight = frequency / total + policy.recencyWeight = recency / total + policy.sizeWeight = size / total + policy.mlWeight = ml / total + + glog.V(2).Infof("Updated eviction policy weights: freq=%.2f, recency=%.2f, size=%.2f, ml=%.2f", + policy.accessFrequencyWeight, policy.recencyWeight, policy.sizeWeight, policy.mlWeight) +} + +// SetMLBoosts updates the ML-specific boost factors +func (policy *MLCachePolicy) SetMLBoosts(trainingData, model, sequential, epochRelevance float64) { + policy.trainingDataBoost = trainingData + policy.modelFileBoost = model + policy.sequentialBoost = sequential + policy.epochRelevanceBoost = epochRelevance + + glog.V(2).Infof("Updated ML boost factors: training=%.2f, model=%.2f, sequential=%.2f, epoch=%.2f", + trainingData, model, sequential, epochRelevance) +} diff --git a/weed/mount/ml/cache_policy_test.go b/weed/mount/ml/cache_policy_test.go new file mode 100644 index 000000000..29df5b859 --- /dev/null +++ b/weed/mount/ml/cache_policy_test.go @@ -0,0 +1,549 @@ +package ml + +import ( + "testing" + "time" +) + +func TestMLCachePolicy_Basic(t *testing.T) { + policy := NewMLCachePolicy() + + // Test basic eviction score calculation + entry := &CacheEntry{ + Inode: 1, + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + CacheLevel: 0, + Pattern: RandomAccess, + FileType: MLFileUnknown, + IsHot: false, + } + + score := policy.CalculateEvictionScore(entry) + if score <= 0 { + t.Error("Eviction score should be positive") + } + + shouldEvict := policy.ShouldEvict(entry) + t.Logf("Basic entry eviction: score=%.3f, shouldEvict=%v", score, shouldEvict) +} + +func TestMLCachePolicy_ModelFileBoost(t *testing.T) { + policy := NewMLCachePolicy() + + // Create two identical entries, one is a model file + baseEntry := &CacheEntry{ + Inode: 1, + Size: 10 * 1024 * 1024, // 10MB + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 3, + CacheLevel: 0, + Pattern: SequentialAccess, + FileType: MLFileUnknown, + IsModel: false, + } + + modelEntry := &CacheEntry{ + Inode: 2, + Size: 10 * 1024 * 1024, // 10MB + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 3, + CacheLevel: 0, + Pattern: SequentialAccess, + FileType: MLFileModel, + IsModel: true, + } + + baseScore := policy.CalculateEvictionScore(baseEntry) + modelScore := policy.CalculateEvictionScore(modelEntry) + + if modelScore <= baseScore { + t.Errorf("Model file should have higher score than regular file: model=%.3f, base=%.3f", + modelScore, baseScore) + } + + // Model files should be less likely to be evicted + baseShouldEvict := policy.ShouldEvict(baseEntry) + modelShouldEvict := policy.ShouldEvict(modelEntry) + + if modelShouldEvict && !baseShouldEvict { + t.Error("Model file should not be evicted if regular file is not evicted") + } + + t.Logf("Model vs Base eviction: model=%.3f (evict=%v), base=%.3f (evict=%v)", + modelScore, modelShouldEvict, baseScore, baseShouldEvict) +} + +func TestMLCachePolicy_TrainingDataBoost(t *testing.T) { + policy := NewMLCachePolicy() + + regularEntry := &CacheEntry{ + Inode: 1, + Size: 1024, + LastAccess: time.Now().Add(-2 * time.Minute), + AccessCount: 10, + FileType: MLFileUnknown, + IsTrainingData: false, + } + + trainingEntry := &CacheEntry{ + Inode: 2, + Size: 1024, + LastAccess: time.Now().Add(-2 * time.Minute), + AccessCount: 10, + FileType: MLFileDataset, + IsTrainingData: true, + } + + regularScore := policy.CalculateEvictionScore(regularEntry) + trainingScore := policy.CalculateEvictionScore(trainingEntry) + + if trainingScore <= regularScore { + t.Errorf("Training data should have higher score: training=%.3f, regular=%.3f", + trainingScore, regularScore) + } +} + +func TestMLCachePolicy_AccessPatternBoost(t *testing.T) { + policy := NewMLCachePolicy() + + randomEntry := &CacheEntry{ + Inode: 1, + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + Pattern: RandomAccess, + FileType: MLFileDataset, + } + + sequentialEntry := &CacheEntry{ + Inode: 2, + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + Pattern: SequentialAccess, + FileType: MLFileDataset, + } + + modelAccessEntry := &CacheEntry{ + Inode: 3, + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + Pattern: ModelAccess, + FileType: MLFileModel, + } + + randomScore := policy.CalculateEvictionScore(randomEntry) + sequentialScore := policy.CalculateEvictionScore(sequentialEntry) + modelScore := policy.CalculateEvictionScore(modelAccessEntry) + + if sequentialScore <= randomScore { + t.Errorf("Sequential access should have higher score than random: seq=%.3f, random=%.3f", + sequentialScore, randomScore) + } + + if modelScore <= sequentialScore { + t.Errorf("Model access should have highest score: model=%.3f, seq=%.3f", + modelScore, sequentialScore) + } + + t.Logf("Pattern comparison: random=%.3f, sequential=%.3f, model=%.3f", + randomScore, sequentialScore, modelScore) +} + +func TestMLCachePolicy_SizePreference(t *testing.T) { + policy := NewMLCachePolicy() + + smallEntry := &CacheEntry{ + Inode: 1, + Size: 1024, // 1KB + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 3, + FileType: MLFileUnknown, + } + + largeEntry := &CacheEntry{ + Inode: 2, + Size: 50 * 1024 * 1024, // 50MB + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 3, + FileType: MLFileUnknown, + } + + smallScore := policy.CalculateEvictionScore(smallEntry) + largeScore := policy.CalculateEvictionScore(largeEntry) + + if smallScore <= largeScore { + t.Errorf("Small files should have higher score than large files: small=%.3f, large=%.3f", + smallScore, largeScore) + } +} + +func TestMLCachePolicy_RecencyDecay(t *testing.T) { + policy := NewMLCachePolicy() + + // Create entries with different access times + recentEntry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + FileType: MLFileUnknown, + } + + oldEntry := &CacheEntry{ + Inode: 2, + + Size: 1024, + LastAccess: time.Now().Add(-20 * time.Minute), + AccessCount: 5, + FileType: MLFileUnknown, + } + + recentScore := policy.CalculateEvictionScore(recentEntry) + oldScore := policy.CalculateEvictionScore(oldEntry) + + if recentScore <= oldScore { + t.Errorf("Recent access should have higher score: recent=%.3f, old=%.3f", + recentScore, oldScore) + } +} + +func TestMLCachePolicy_EpochRelevance(t *testing.T) { + policy := NewMLCachePolicy() + + lowRelevanceEntry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + FileType: MLFileDataset, + EpochRelevance: 0.2, + } + + highRelevanceEntry := &CacheEntry{ + Inode: 2, + + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + FileType: MLFileDataset, + EpochRelevance: 0.9, + } + + lowScore := policy.CalculateEvictionScore(lowRelevanceEntry) + highScore := policy.CalculateEvictionScore(highRelevanceEntry) + + if highScore <= lowScore { + t.Errorf("High epoch relevance should have higher score: high=%.3f, low=%.3f", + highScore, lowScore) + } +} + +func TestMLCachePolicy_DifferentThresholds(t *testing.T) { + policy := NewMLCachePolicy() + + // Create entries for different file types with same base score + unknownEntry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now().Add(-15 * time.Minute), // Old enough to potentially evict + AccessCount: 2, + FileType: MLFileUnknown, + } + + modelEntry := &CacheEntry{ + Inode: 2, + + Size: 1024, + LastAccess: time.Now().Add(-15 * time.Minute), + AccessCount: 2, + FileType: MLFileModel, + IsModel: true, + } + + datasetEntry := &CacheEntry{ + Inode: 3, + + Size: 1024, + LastAccess: time.Now().Add(-15 * time.Minute), + AccessCount: 2, + FileType: MLFileDataset, + Pattern: SequentialAccess, + } + + unknownShouldEvict := policy.ShouldEvict(unknownEntry) + modelShouldEvict := policy.ShouldEvict(modelEntry) + datasetShouldEvict := policy.ShouldEvict(datasetEntry) + + // Models should be least likely to be evicted + if modelShouldEvict && (!unknownShouldEvict || !datasetShouldEvict) { + t.Error("Model files should be least likely to be evicted") + } + + t.Logf("Eviction by type: unknown=%v, model=%v, dataset=%v", + unknownShouldEvict, modelShouldEvict, datasetShouldEvict) +} + +func TestMLCachePolicy_SetWeights(t *testing.T) { + policy := NewMLCachePolicy() + + // Test setting custom weights + policy.SetWeights(0.4, 0.3, 0.1, 0.2) + + if policy.accessFrequencyWeight != 0.4 { + t.Errorf("Expected frequency weight 0.4, got %.2f", policy.accessFrequencyWeight) + } + + if policy.recencyWeight != 0.3 { + t.Errorf("Expected recency weight 0.3, got %.2f", policy.recencyWeight) + } + + if policy.sizeWeight != 0.1 { + t.Errorf("Expected size weight 0.1, got %.2f", policy.sizeWeight) + } + + if policy.mlWeight != 0.2 { + t.Errorf("Expected ML weight 0.2, got %.2f", policy.mlWeight) + } + + // Test weight normalization + policy.SetWeights(2.0, 2.0, 1.0, 1.0) // Total = 6.0 + + expectedFreq := 2.0 / 6.0 + if abs(policy.accessFrequencyWeight - expectedFreq) > 0.001 { + t.Errorf("Expected normalized frequency weight %.3f, got %.3f", + expectedFreq, policy.accessFrequencyWeight) + } +} + +func TestMLCachePolicy_SetMLBoosts(t *testing.T) { + policy := NewMLCachePolicy() + + // Test setting custom boost factors + policy.SetMLBoosts(2.0, 3.0, 1.5, 1.8) + + if policy.trainingDataBoost != 2.0 { + t.Errorf("Expected training data boost 2.0, got %.2f", policy.trainingDataBoost) + } + + if policy.modelFileBoost != 3.0 { + t.Errorf("Expected model file boost 3.0, got %.2f", policy.modelFileBoost) + } + + if policy.sequentialBoost != 1.5 { + t.Errorf("Expected sequential boost 1.5, got %.2f", policy.sequentialBoost) + } + + if policy.epochRelevanceBoost != 1.8 { + t.Errorf("Expected epoch relevance boost 1.8, got %.2f", policy.epochRelevanceBoost) + } +} + +func TestMLCachePolicy_Metrics(t *testing.T) { + policy := NewMLCachePolicy() + + // Simulate some evictions + entries := []*CacheEntry{ + {FileType: MLFileModel, IsModel: true}, + {FileType: MLFileDataset, IsTrainingData: true}, + {FileType: MLFileUnknown}, + } + + for _, entry := range entries { + entry.LastAccess = time.Now().Add(-30 * time.Minute) // Old enough to evict + entry.AccessCount = 1 + entry.Size = 1024 + + if policy.ShouldEvict(entry) { + // Eviction counters are updated in ShouldEvict + } + } + + metrics := policy.GetEvictionMetrics() + + if metrics.TotalEvictions == 0 { + t.Error("Should have some total evictions") + } + + // Verify weight configuration in metrics + if metrics.AccessFrequencyWeight != policy.accessFrequencyWeight { + t.Error("Metrics should reflect current weight configuration") + } +} + +func TestMLCachePolicy_HotChunkPreference(t *testing.T) { + policy := NewMLCachePolicy() + + coldEntry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + IsHot: false, + FileType: MLFileDataset, + } + + hotEntry := &CacheEntry{ + Inode: 2, + + Size: 1024, + LastAccess: time.Now(), + AccessCount: 5, + IsHot: true, + FileType: MLFileDataset, + } + + coldScore := policy.CalculateEvictionScore(coldEntry) + hotScore := policy.CalculateEvictionScore(hotEntry) + + if hotScore <= coldScore { + t.Errorf("Hot chunk should have higher score: hot=%.3f, cold=%.3f", hotScore, coldScore) + } +} + +func TestMLCachePolicy_RecencyThresholds(t *testing.T) { + policy := NewMLCachePolicy() + + // Test hot threshold + hotEntry := &CacheEntry{ + Inode: 1, + Size: 1024, + LastAccess: time.Now().Add(-30 * time.Second), // Within hot threshold + AccessCount: 1, + } + + // Test cold threshold + coldEntry := &CacheEntry{ + Inode: 2, + Size: 1024, + LastAccess: time.Now().Add(-15 * time.Minute), // Beyond cold threshold + AccessCount: 1, + } + + // Test middle + middleEntry := &CacheEntry{ + Inode: 3, + Size: 1024, + LastAccess: time.Now().Add(-5 * time.Minute), // Between thresholds + AccessCount: 1, + } + + hotScore := policy.calculateRecencyScore(time.Since(hotEntry.LastAccess)) + coldScore := policy.calculateRecencyScore(time.Since(coldEntry.LastAccess)) + middleScore := policy.calculateRecencyScore(time.Since(middleEntry.LastAccess)) + + if hotScore != 1.0 { + t.Errorf("Hot entry should have score 1.0, got %.3f", hotScore) + } + + if coldScore != 0.1 { + t.Errorf("Cold entry should have score 0.1, got %.3f", coldScore) + } + + if middleScore <= coldScore || middleScore >= hotScore { + t.Errorf("Middle entry should have score between hot and cold: %.3f not in (%.3f, %.3f)", + middleScore, coldScore, hotScore) + } +} + +func TestMLCachePolicy_SizeScore(t *testing.T) { + policy := NewMLCachePolicy() + + smallSize := uint64(1024) // 1KB + largeSize := uint64(100 * 1024 * 1024) // 100MB + + smallScore := policy.calculateSizeScore(smallSize) + largeScore := policy.calculateSizeScore(largeSize) + + if smallScore <= largeScore { + t.Errorf("Small files should have higher size score: small=%.3f, large=%.3f", + smallScore, largeScore) + } + + // Large files should still have reasonable score (not too low) + if largeScore < 0.2 { + t.Errorf("Large files should have reasonable score, got %.3f", largeScore) + } +} + +func TestMLCachePolicy_AccessFrequencyScore(t *testing.T) { + policy := NewMLCachePolicy() + + lowAccessEntry := &CacheEntry{ + AccessCount: 1, + FileType: MLFileUnknown, + Pattern: RandomAccess, + } + + highAccessEntry := &CacheEntry{ + AccessCount: 100, + FileType: MLFileUnknown, + Pattern: RandomAccess, + } + + lowScore := policy.calculateAccessFrequencyScore(lowAccessEntry) + highScore := policy.calculateAccessFrequencyScore(highAccessEntry) + + if highScore <= lowScore { + t.Errorf("High access count should have higher score: high=%.3f, low=%.3f", + highScore, lowScore) + } +} + +// Helper function +func abs(x float64) float64 { + if x < 0 { + return -x + } + return x +} + +// Benchmark tests + +func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) { + policy := NewMLCachePolicy() + + entry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 10, + FileType: MLFileDataset, + Pattern: SequentialAccess, + IsTrainingData: true, + EpochRelevance: 0.8, + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + policy.CalculateEvictionScore(entry) + } +} + +func BenchmarkMLCachePolicy_ShouldEvict(b *testing.B) { + policy := NewMLCachePolicy() + + entry := &CacheEntry{ + Inode: 1, + + Size: 1024, + LastAccess: time.Now().Add(-5 * time.Minute), + AccessCount: 10, + FileType: MLFileDataset, + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + policy.ShouldEvict(entry) + } +} diff --git a/weed/mount/ml/fuse_integration.go b/weed/mount/ml/fuse_integration.go new file mode 100644 index 000000000..54b770eb5 --- /dev/null +++ b/weed/mount/ml/fuse_integration.go @@ -0,0 +1,312 @@ +package ml + +import ( + "time" + + "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// FUSEMLIntegration provides ML optimization integration for SeaweedFS FUSE mount +type FUSEMLIntegration struct { + // Core ML components + openFileCache *OpenFileCache + cachePolicy *MLCachePolicy + mlOptimization *MLOptimization + + // FUSE-specific configuration + enableKeepCache bool // Enable FOPEN_KEEP_CACHE for ML files + enableWriteback bool // Enable writeback caching + attrCacheTimeout time.Duration // Attribute cache timeout for ML files + entryCacheTimeout time.Duration // Entry cache timeout for ML files + + // ML-specific FUSE optimizations + mlAttrTimeout time.Duration // Extended attribute timeout for ML files + datasetAttrTimeout time.Duration // Even longer timeout for dataset files + modelAttrTimeout time.Duration // Longest timeout for model files + + // Statistics + keepCacheEnabled int64 // Number of times keep cache was enabled + writebackEnabled int64 // Number of times writeback was enabled + mlAttrCacheHits int64 // ML-specific attribute cache hits +} + +// NewFUSEMLIntegration creates a new FUSE ML integration +func NewFUSEMLIntegration(mlOpt *MLOptimization) *FUSEMLIntegration { + return &FUSEMLIntegration{ + openFileCache: NewOpenFileCache(1000, 30*time.Minute), + cachePolicy: NewMLCachePolicy(), + mlOptimization: mlOpt, + enableKeepCache: true, + enableWriteback: true, + attrCacheTimeout: 5 * time.Second, + entryCacheTimeout: 10 * time.Second, + + // ML-specific timeouts (longer for more stable caching) + mlAttrTimeout: 30 * time.Second, + datasetAttrTimeout: 60 * time.Second, + modelAttrTimeout: 120 * time.Second, // Longest for model files + } +} + +// OnFileOpen handles file open events for ML optimization +func (fmi *FUSEMLIntegration) OnFileOpen(inode uint64, entry *filer_pb.Entry, fullPath string, flags uint32, out *fuse.OpenOut) { + // Register file in cache + fileInfo := fmi.openFileCache.OpenFile(inode, entry, fullPath) + + // Apply ML-specific FUSE optimizations + if fileInfo.IsMLFile && fmi.enableKeepCache { + // Enable keep cache for ML files to reduce redundant reads + out.OpenFlags |= fuse.FOPEN_KEEP_CACHE + fmi.keepCacheEnabled++ + + glog.V(3).Infof("Enabled FOPEN_KEEP_CACHE for ML file: inode=%d, type=%v", + inode, fileInfo.FileType) + } + + // For large model files, also enable direct I/O to bypass page cache for very large reads + if fileInfo.FileType == MLFileModel && entry.Attributes.FileSize > 100*1024*1024 { // > 100MB + // Note: Direct I/O can be beneficial for very large sequential reads + // but may hurt performance for small random reads + if fileInfo.ReadPattern == SequentialAccess || fileInfo.ReadPattern == ModelAccess { + out.OpenFlags |= fuse.FOPEN_DIRECT_IO + glog.V(3).Infof("Enabled FOPEN_DIRECT_IO for large model file: inode=%d", inode) + } + } +} + +// OnFileClose handles file close events +func (fmi *FUSEMLIntegration) OnFileClose(inode uint64) { + canEvict := fmi.openFileCache.CloseFile(inode) + + if canEvict { + glog.V(4).Infof("File closed and available for eviction: inode=%d", inode) + } +} + +// OnFileRead handles file read events for ML pattern detection +func (fmi *FUSEMLIntegration) OnFileRead(inode uint64, offset int64, size int) { + // Update access pattern + if fmi.mlOptimization != nil && fmi.mlOptimization.IsEnabled() { + accessInfo := fmi.mlOptimization.RecordAccess(inode, offset, size) + + // Update file info with detected pattern + if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil { + fileInfo.Lock() + if accessInfo != nil { + fileInfo.ReadPattern = accessInfo.Pattern + fileInfo.AccessInfo = accessInfo + } + fileInfo.TotalBytesRead += int64(size) + fileInfo.Unlock() + + // Trigger prefetching if pattern detected + if shouldPrefetch, _ := fmi.mlOptimization.ShouldPrefetch(inode); shouldPrefetch { + glog.V(4).Infof("Prefetch triggered for ML file: inode=%d, pattern=%v", + inode, fileInfo.ReadPattern) + } + } + } +} + +// OptimizeAttributes applies ML-specific attribute caching optimizations +func (fmi *FUSEMLIntegration) OptimizeAttributes(inode uint64, out *fuse.AttrOut) { + fileInfo := fmi.openFileCache.GetFileInfo(inode) + if fileInfo == nil { + // Use default timeout + out.AttrValid = uint64(fmi.attrCacheTimeout.Seconds()) + return + } + + // Apply ML-specific timeouts + var timeout time.Duration + + switch fileInfo.FileType { + case MLFileModel: + // Model files rarely change, cache attributes longer + timeout = fmi.modelAttrTimeout + case MLFileDataset: + // Dataset files are read-only during training, cache longer + timeout = fmi.datasetAttrTimeout + case MLFileTensor, MLFileConfig: + // Moderate timeout for tensor and config files + timeout = fmi.mlAttrTimeout + default: + // Use default timeout for non-ML files + timeout = fmi.attrCacheTimeout + } + + out.AttrValid = uint64(timeout.Seconds()) + fmi.mlAttrCacheHits++ + + glog.V(4).Infof("ML attribute cache timeout: inode=%d, type=%v, timeout=%v", + inode, fileInfo.FileType, timeout) +} + +// OptimizeEntryCache applies ML-specific entry caching optimizations +func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.Entry, out *fuse.EntryOut) { + fileInfo := fmi.openFileCache.GetFileInfo(inode) + if fileInfo == nil { + // Use default timeout + out.SetEntryTimeout(fmi.entryCacheTimeout) + return + } + + // ML files can have longer entry cache timeouts since they change infrequently + var timeout time.Duration + + switch fileInfo.FileType { + case MLFileModel, MLFileDataset: + // Models and datasets rarely change during training + timeout = fmi.datasetAttrTimeout + case MLFileConfig: + // Config files change even less frequently + timeout = fmi.modelAttrTimeout + default: + timeout = fmi.entryCacheTimeout + } + + out.SetEntryTimeout(timeout) + + glog.V(4).Infof("ML entry cache timeout: inode=%d, type=%v, timeout=%v", + inode, fileInfo.FileType, timeout) +} + +// ShouldEnableWriteback determines if writeback caching should be enabled for a file +func (fmi *FUSEMLIntegration) ShouldEnableWriteback(inode uint64, entry *filer_pb.Entry) bool { + if !fmi.enableWriteback { + return false + } + + fileInfo := fmi.openFileCache.GetFileInfo(inode) + if fileInfo == nil { + return false + } + + // Enable writeback for ML files that are frequently written + switch fileInfo.FileType { + case MLFileLog: + // Training logs benefit from writeback caching + return true + case MLFileModel: + // Model checkpoints during training benefit from writeback + if fileInfo.AccessInfo != nil && fileInfo.AccessInfo.Pattern == SequentialAccess { + return true + } + case MLFileConfig: + // Config files rarely change, so writeback not as beneficial + return false + case MLFileDataset: + // Datasets are typically read-only during training + return false + default: + // Default behavior for non-ML files + return false + } + + return false +} + +// OnChunkAccess updates chunk-level metadata when chunks are accessed +func (fmi *FUSEMLIntegration) OnChunkAccess(inode uint64, chunkIndex uint32, fileId string, cacheLevel int, isHit bool) { + metadata := &ChunkMetadata{ + FileId: fileId, + Offset: uint64(chunkIndex) * 1024, // Assuming 1KB chunks for now + Size: 1024, + LastAccess: time.Now(), + CacheLevel: cacheLevel, + AccessCount: 1, // Will be incremented in UpdateChunkCache + } + + // Update chunk cache + fmi.openFileCache.UpdateChunkCache(inode, chunkIndex, metadata) + + // Update file-level statistics + if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil { + fileInfo.Lock() + if isHit { + fileInfo.CacheHitCount++ + } else { + fileInfo.CacheMissCount++ + } + fileInfo.Unlock() + } +} + +// GetOptimizationMetrics returns comprehensive optimization metrics +func (fmi *FUSEMLIntegration) GetOptimizationMetrics() FUSEMLMetrics { + var mlMetrics *MLOptimizationMetrics + if fmi.mlOptimization != nil { + mlMetrics = fmi.mlOptimization.GetMetrics() + } + + return FUSEMLMetrics{ + MLOptimizationMetrics: mlMetrics, + OpenFileCacheMetrics: fmi.openFileCache.GetMetrics(), + CachePolicyMetrics: fmi.cachePolicy.GetEvictionMetrics(), + KeepCacheEnabled: fmi.keepCacheEnabled, + WritebackEnabled: fmi.writebackEnabled, + MLAttrCacheHits: fmi.mlAttrCacheHits, + EnableKeepCache: fmi.enableKeepCache, + EnableWriteback: fmi.enableWriteback, + } +} + +// FUSEMLMetrics holds comprehensive FUSE ML optimization metrics +type FUSEMLMetrics struct { + MLOptimizationMetrics *MLOptimizationMetrics `json:"ml_optimization,omitempty"` + OpenFileCacheMetrics OpenFileCacheMetrics `json:"open_file_cache"` + CachePolicyMetrics MLCachePolicyMetrics `json:"cache_policy"` + + // FUSE-specific metrics + KeepCacheEnabled int64 `json:"keep_cache_enabled"` + WritebackEnabled int64 `json:"writeback_enabled"` + MLAttrCacheHits int64 `json:"ml_attr_cache_hits"` + + // Configuration + EnableKeepCache bool `json:"enable_keep_cache"` + EnableWriteback bool `json:"enable_writeback"` +} + +// Shutdown gracefully shuts down the FUSE ML integration +func (fmi *FUSEMLIntegration) Shutdown() { + glog.V(1).Infof("Shutting down FUSE ML integration...") + + if fmi.openFileCache != nil { + fmi.openFileCache.Shutdown() + } + + if fmi.mlOptimization != nil { + fmi.mlOptimization.Shutdown() + } + + // Print final metrics + metrics := fmi.GetOptimizationMetrics() + glog.V(1).Infof("FUSE ML integration final metrics: keep_cache=%d, writeback=%d, attr_hits=%d", + metrics.KeepCacheEnabled, metrics.WritebackEnabled, metrics.MLAttrCacheHits) +} + +// EnableMLOptimizations enables or disables ML optimizations +func (fmi *FUSEMLIntegration) EnableMLOptimizations(enabled bool) { + fmi.enableKeepCache = enabled + fmi.enableWriteback = enabled + + if fmi.mlOptimization != nil { + fmi.mlOptimization.Enable(enabled) + } + + glog.V(1).Infof("ML FUSE optimizations %s", map[bool]string{true: "enabled", false: "disabled"}[enabled]) +} + +// SetCacheTimeouts configures cache timeouts for different file types +func (fmi *FUSEMLIntegration) SetCacheTimeouts(attr, entry, mlAttr, dataset, model time.Duration) { + fmi.attrCacheTimeout = attr + fmi.entryCacheTimeout = entry + fmi.mlAttrTimeout = mlAttr + fmi.datasetAttrTimeout = dataset + fmi.modelAttrTimeout = model + + glog.V(2).Infof("Updated cache timeouts: attr=%v, entry=%v, ml=%v, dataset=%v, model=%v", + attr, entry, mlAttr, dataset, model) +} diff --git a/weed/mount/ml/open_file_cache.go b/weed/mount/ml/open_file_cache.go new file mode 100644 index 000000000..8d23a0bd8 --- /dev/null +++ b/weed/mount/ml/open_file_cache.go @@ -0,0 +1,577 @@ +package ml + +import ( + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// ChunkMetadata contains metadata about a cached chunk +type ChunkMetadata struct { + FileId string // Chunk file ID + Offset uint64 // Offset within the file + Size uint64 // Size of the chunk + CacheLevel int // 0=memory, 1=disk, 2=not cached + LastAccess time.Time // Last access time + AccessCount int64 // Number of times accessed + IsHot bool // Whether this chunk is frequently accessed + Pattern AccessPattern // Access pattern for this chunk +} + +// OpenFileInfo contains comprehensive information about an open file +type OpenFileInfo struct { + sync.RWMutex + + // Basic file information + Inode uint64 // File inode + Entry *filer_pb.Entry // File entry from filer + OpenCount int // Number of open handles + OpenTime time.Time // When file was first opened + LastAccess time.Time // Last access time + + // Chunk-level caching + ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata + ChunkCount uint32 // Total number of chunks in file + ChunkSize int64 // Size of each chunk + + // Access pattern tracking + AccessInfo *AccessInfo // Access pattern information + ReadPattern AccessPattern // Overall file access pattern + PrefetchState PrefetchState // Current prefetch state + + // ML-specific optimizations + IsMLFile bool // Whether this is likely an ML-related file + FileType MLFileType // Type of ML file (dataset, model, etc.) + BatchSize int // Detected batch size for training data + EpochCount int // Number of epochs detected + + // Performance tracking + TotalBytesRead int64 // Total bytes read from this file + CacheHitCount int64 // Number of cache hits + CacheMissCount int64 // Number of cache misses + PrefetchHitCount int64 // Number of prefetch hits +} + +// PrefetchState represents the current prefetch state for a file +type PrefetchState int + +const ( + PrefetchIdle PrefetchState = iota + PrefetchActive + PrefetchComplete + PrefetchSuspended +) + +// MLFileType represents the type of ML-related file +type MLFileType int + +const ( + MLFileUnknown MLFileType = iota + MLFileDataset // Training/validation dataset + MLFileModel // Model checkpoint/weights + MLFileConfig // Configuration files + MLFileTensor // Individual tensor files + MLFileLog // Training logs +) + +// OpenFileCache manages open file information with ML-aware optimizations +type OpenFileCache struct { + sync.RWMutex + + // Configuration + maxFiles int // Maximum number of files to track + ttl time.Duration // TTL for inactive files + cleanupInterval time.Duration // Cleanup interval + + // File tracking + files map[uint64]*OpenFileInfo // inode -> file info + accessOrder []uint64 // LRU order for eviction + + // ML-specific configuration + enableMLOptimization bool + mlFileDetector *MLFileDetector + + // Metrics + totalFiles int64 + evictedFiles int64 + cacheHits int64 + cacheMisses int64 + + // Background cleanup + shutdown chan struct{} + done chan struct{} +} + +// MLFileDetector detects ML-related files based on patterns and metadata +type MLFileDetector struct { + // File extension patterns + datasetExtensions map[string]bool + modelExtensions map[string]bool + configExtensions map[string]bool + + // Path patterns + datasetPaths []string + modelPaths []string + + // Size heuristics + modelMinSize int64 // Minimum size for model files + datasetMaxItems int // Maximum items in dataset directory +} + +// NewOpenFileCache creates a new open file cache optimized for ML workloads +func NewOpenFileCache(maxFiles int, ttl time.Duration) *OpenFileCache { + if maxFiles <= 0 { + maxFiles = 1000 // Default suitable for ML workloads + } + if ttl <= 0 { + ttl = 30 * time.Minute // Default TTL + } + + ofc := &OpenFileCache{ + maxFiles: maxFiles, + ttl: ttl, + cleanupInterval: 5 * time.Minute, + files: make(map[uint64]*OpenFileInfo), + accessOrder: make([]uint64, 0, maxFiles), + enableMLOptimization: true, + mlFileDetector: newMLFileDetector(), + shutdown: make(chan struct{}), + done: make(chan struct{}), + } + + // Start background cleanup + go ofc.cleanupWorker() + + glog.V(1).Infof("OpenFileCache initialized: maxFiles=%d, ttl=%v", maxFiles, ttl) + return ofc +} + +// newMLFileDetector creates a new ML file detector with common patterns +func newMLFileDetector() *MLFileDetector { + return &MLFileDetector{ + datasetExtensions: map[string]bool{ + "jpg": true, "jpeg": true, "png": true, "bmp": true, "tiff": true, + "wav": true, "mp3": true, "flac": true, + "txt": true, "csv": true, "json": true, "jsonl": true, + "parquet": true, "arrow": true, "h5": true, "hdf5": true, + "tfrecord": true, "tfrecords": true, + }, + modelExtensions: map[string]bool{ + "pt": true, "pth": true, "pkl": true, "pickle": true, + "h5": true, "hdf5": true, "pb": true, "pbtxt": true, + "onnx": true, "tflite": true, "caffemodel": true, + "bin": true, "safetensors": true, + }, + configExtensions: map[string]bool{ + "yaml": true, "yml": true, "json": true, "toml": true, + "cfg": true, "config": true, "conf": true, + }, + datasetPaths: []string{ + "/datasets", "/data", "/train", "/test", "/val", "/validation", + "/images", "/audio", "/text", "/corpus", + }, + modelPaths: []string{ + "/models", "/checkpoints", "/weights", "/pretrained", + "/saved_models", "/exports", + }, + modelMinSize: 1024 * 1024, // 1MB minimum for model files + datasetMaxItems: 1000000, // 1M max items in dataset directory + } +} + +// OpenFile registers a file as opened and initializes tracking +func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath string) *OpenFileInfo { + ofc.Lock() + defer ofc.Unlock() + + // Get or create file info + fileInfo := ofc.files[inode] + if fileInfo == nil { + fileInfo = &OpenFileInfo{ + Inode: inode, + Entry: entry, + OpenTime: time.Now(), + ChunkCache: make(map[uint32]*ChunkMetadata), + AccessInfo: &AccessInfo{Inode: inode}, + ReadPattern: RandomAccess, + PrefetchState: PrefetchIdle, + } + + // Detect ML file type + if ofc.enableMLOptimization { + fileInfo.IsMLFile, fileInfo.FileType = ofc.mlFileDetector.DetectMLFile(entry, fullPath) + if fileInfo.IsMLFile { + glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s", + inode, fileInfo.FileType, fullPath) + } + } + + ofc.files[inode] = fileInfo + ofc.totalFiles++ + + // Update access order for LRU + ofc.updateAccessOrder(inode) + + // Evict if necessary + if len(ofc.files) > ofc.maxFiles { + ofc.evictLRU() + } + } + + fileInfo.OpenCount++ + fileInfo.LastAccess = time.Now() + ofc.updateAccessOrder(inode) + + glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v", + inode, fileInfo.OpenCount, fileInfo.IsMLFile) + + return fileInfo +} + +// CloseFile decrements the open count and potentially cleans up +func (ofc *OpenFileCache) CloseFile(inode uint64) bool { + ofc.Lock() + defer ofc.Unlock() + + fileInfo := ofc.files[inode] + if fileInfo == nil { + return true // Already cleaned up + } + + fileInfo.OpenCount-- + glog.V(4).Infof("File closed: inode=%d, openCount=%d", inode, fileInfo.OpenCount) + + // Return true if file can be evicted (no more open handles) + return fileInfo.OpenCount <= 0 +} + +// GetFileInfo retrieves file information if cached +func (ofc *OpenFileCache) GetFileInfo(inode uint64) *OpenFileInfo { + ofc.RLock() + defer ofc.RUnlock() + + fileInfo := ofc.files[inode] + if fileInfo != nil { + fileInfo.LastAccess = time.Now() + ofc.cacheHits++ + return fileInfo + } + + ofc.cacheMisses++ + return nil +} + +// UpdateChunkCache updates chunk metadata for a file +func (ofc *OpenFileCache) UpdateChunkCache(inode uint64, chunkIndex uint32, metadata *ChunkMetadata) { + ofc.RLock() + fileInfo := ofc.files[inode] + ofc.RUnlock() + + if fileInfo == nil { + return + } + + fileInfo.Lock() + defer fileInfo.Unlock() + + fileInfo.ChunkCache[chunkIndex] = metadata + metadata.LastAccess = time.Now() + metadata.AccessCount++ + + glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d", + inode, chunkIndex, metadata.CacheLevel) +} + +// GetChunkMetadata retrieves chunk metadata if available +func (ofc *OpenFileCache) GetChunkMetadata(inode uint64, chunkIndex uint32) (*ChunkMetadata, bool) { + ofc.RLock() + fileInfo := ofc.files[inode] + ofc.RUnlock() + + if fileInfo == nil { + return nil, false + } + + fileInfo.RLock() + defer fileInfo.RUnlock() + + metadata, exists := fileInfo.ChunkCache[chunkIndex] + if exists { + metadata.LastAccess = time.Now() + metadata.AccessCount++ + } + + return metadata, exists +} + +// updateAccessOrder updates the LRU access order +func (ofc *OpenFileCache) updateAccessOrder(inode uint64) { + // Remove from current position + for i, ino := range ofc.accessOrder { + if ino == inode { + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + break + } + } + + // Add to front (most recently used) + ofc.accessOrder = append([]uint64{inode}, ofc.accessOrder...) +} + +// evictLRU evicts the least recently used file +func (ofc *OpenFileCache) evictLRU() { + if len(ofc.accessOrder) == 0 { + return + } + + // Find LRU file that can be evicted (not currently open) + for i := len(ofc.accessOrder) - 1; i >= 0; i-- { + inode := ofc.accessOrder[i] + fileInfo := ofc.files[inode] + + if fileInfo != nil && fileInfo.OpenCount <= 0 { + // Evict this file + delete(ofc.files, inode) + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + ofc.evictedFiles++ + + glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d", + inode, len(fileInfo.ChunkCache)) + return + } + } + + // If no files can be evicted, just log a warning + glog.V(2).Infof("Warning: Could not evict any files from cache (all files are open)") +} + +// cleanupWorker periodically cleans up expired entries +func (ofc *OpenFileCache) cleanupWorker() { + ticker := time.NewTicker(ofc.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ofc.cleanup() + case <-ofc.shutdown: + close(ofc.done) + return + } + } +} + +// cleanup removes expired file entries +func (ofc *OpenFileCache) cleanup() { + ofc.Lock() + defer ofc.Unlock() + + now := time.Now() + toRemove := make([]uint64, 0) + + for inode, fileInfo := range ofc.files { + // Only cleanup files that are not open and have expired + if fileInfo.OpenCount <= 0 && now.Sub(fileInfo.LastAccess) > ofc.ttl { + toRemove = append(toRemove, inode) + } + } + + // Remove expired files + for _, inode := range toRemove { + delete(ofc.files, inode) + // Remove from access order + for i, ino := range ofc.accessOrder { + if ino == inode { + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + break + } + } + } + + if len(toRemove) > 0 { + glog.V(3).Infof("Cleaned up %d expired file cache entries", len(toRemove)) + } +} + +// GetMetrics returns cache metrics +func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics { + ofc.RLock() + defer ofc.RUnlock() + + var totalChunks int64 + var mlFiles int64 + fileTypes := make(map[MLFileType]int) + patterns := make(map[AccessPattern]int) + + for _, fileInfo := range ofc.files { + totalChunks += int64(len(fileInfo.ChunkCache)) + if fileInfo.IsMLFile { + mlFiles++ + fileTypes[fileInfo.FileType]++ + } + patterns[fileInfo.ReadPattern]++ + } + + return OpenFileCacheMetrics{ + TotalFiles: int64(len(ofc.files)), + MLFiles: mlFiles, + TotalChunks: totalChunks, + CacheHits: ofc.cacheHits, + CacheMisses: ofc.cacheMisses, + EvictedFiles: ofc.evictedFiles, + FileTypes: fileTypes, + AccessPatterns: patterns, + } +} + +// OpenFileCacheMetrics holds metrics for the open file cache +type OpenFileCacheMetrics struct { + TotalFiles int64 `json:"total_files"` + MLFiles int64 `json:"ml_files"` + TotalChunks int64 `json:"total_chunks"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + EvictedFiles int64 `json:"evicted_files"` + FileTypes map[MLFileType]int `json:"file_types"` + AccessPatterns map[AccessPattern]int `json:"access_patterns"` +} + +// Shutdown gracefully shuts down the open file cache +func (ofc *OpenFileCache) Shutdown() { + glog.V(1).Infof("Shutting down OpenFileCache...") + + close(ofc.shutdown) + + // Wait for cleanup worker to finish + <-ofc.done + + // Print final metrics + metrics := ofc.GetMetrics() + glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d", + metrics.TotalFiles, metrics.TotalChunks, metrics.CacheHits, metrics.CacheMisses) +} + +// MLFileDetector methods + +// DetectMLFile determines if a file is ML-related and its type +func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath string) (bool, MLFileType) { + if entry == nil { + return false, MLFileUnknown + } + + name := entry.Name + size := int64(entry.Attributes.FileSize) + + // Check file extension + if ext := getFileExtension(name); ext != "" { + if detector.datasetExtensions[ext] { + return true, MLFileDataset + } + if detector.modelExtensions[ext] { + return true, MLFileModel + } + if detector.configExtensions[ext] { + return true, MLFileConfig + } + } + + // Check path patterns + for _, path := range detector.datasetPaths { + if contains(fullPath, path) { + return true, MLFileDataset + } + } + + for _, path := range detector.modelPaths { + if contains(fullPath, path) { + return true, MLFileModel + } + } + + // Check size heuristics + if size > detector.modelMinSize { + // Large files in certain contexts might be models + if contains(fullPath, "model") || contains(fullPath, "checkpoint") || contains(fullPath, "weight") { + return true, MLFileModel + } + } + + // Check for tensor files + if contains(name, "tensor") || contains(name, ".pt") || contains(name, ".npy") { + return true, MLFileTensor + } + + // Check for log files + if contains(name, "log") || contains(name, "tensorboard") || contains(fullPath, "logs") { + return true, MLFileLog + } + + return false, MLFileUnknown +} + +// Helper functions + +func getFileExtension(filename string) string { + for i := len(filename) - 1; i >= 0; i-- { + if filename[i] == '.' { + return filename[i+1:] + } + } + return "" +} + +func contains(str, substr string) bool { + return len(str) >= len(substr) && findSubstring(str, substr) +} + +func findSubstring(str, substr string) bool { + if len(substr) == 0 { + return true + } + if len(str) < len(substr) { + return false + } + + for i := 0; i <= len(str)-len(substr); i++ { + if str[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// String methods for enums + +func (ps PrefetchState) String() string { + switch ps { + case PrefetchIdle: + return "Idle" + case PrefetchActive: + return "Active" + case PrefetchComplete: + return "Complete" + case PrefetchSuspended: + return "Suspended" + default: + return "Unknown" + } +} + +func (ft MLFileType) String() string { + switch ft { + case MLFileDataset: + return "Dataset" + case MLFileModel: + return "Model" + case MLFileConfig: + return "Config" + case MLFileTensor: + return "Tensor" + case MLFileLog: + return "Log" + default: + return "Unknown" + } +} diff --git a/weed/mount/ml/open_file_cache_test.go b/weed/mount/ml/open_file_cache_test.go new file mode 100644 index 000000000..d7d3e9664 --- /dev/null +++ b/weed/mount/ml/open_file_cache_test.go @@ -0,0 +1,617 @@ +package ml + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func TestOpenFileCache_Basic(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + // Test opening a file + entry := &filer_pb.Entry{ + Name: "test.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + + inode := uint64(1) + fullPath := "/test/test.txt" + fileInfo := cache.OpenFile(inode, entry, fullPath) + + if fileInfo == nil { + t.Fatal("OpenFile should return file info") + } + + if fileInfo.Inode != inode { + t.Errorf("Expected inode %d, got %d", inode, fileInfo.Inode) + } + + if fileInfo.OpenCount != 1 { + t.Errorf("Expected open count 1, got %d", fileInfo.OpenCount) + } +} + +func TestOpenFileCache_MLFileDetection(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + testCases := []struct { + name string + path string + filename string + size uint64 + expected MLFileType + }{ + {"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100*1024*1024, MLFileModel}, + {"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2*1024*1024, MLFileDataset}, + {"Config file", "/config/training.yaml", "training.yaml", 1024, MLFileConfig}, + {"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50*1024*1024, MLFileModel}, + {"Log file", "/logs/training.log", "training.log", 10*1024, MLFileLog}, + {"Regular file", "/documents/readme.txt", "readme.txt", 5*1024, MLFileUnknown}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: tc.filename, + Attributes: &filer_pb.FuseAttributes{ + FileSize: tc.size, + }, + } + + inode := uint64(time.Now().UnixNano()) // Unique inode + fileInfo := cache.OpenFile(inode, entry, tc.path) + + if tc.expected == MLFileUnknown { + if fileInfo.IsMLFile { + t.Errorf("File %s should not be detected as ML file", tc.path) + } + } else { + if !fileInfo.IsMLFile { + t.Errorf("File %s should be detected as ML file", tc.path) + } + + if fileInfo.FileType != tc.expected { + t.Errorf("Expected file type %v, got %v", tc.expected, fileInfo.FileType) + } + } + }) + } +} + +func TestOpenFileCache_ChunkMetadata(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + inode := uint64(1) + entry := &filer_pb.Entry{ + Name: "data.bin", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 10240, + }, + } + fullPath := "/data/data.bin" + + cache.OpenFile(inode, entry, fullPath) + + // Test updating chunk metadata + chunkIndex := uint32(0) + metadata := &ChunkMetadata{ + FileId: "chunk_0", + Offset: 0, + Size: 1024, + CacheLevel: 0, + LastAccess: time.Now(), + AccessCount: 1, + Pattern: SequentialAccess, + } + + cache.UpdateChunkCache(inode, chunkIndex, metadata) + + // Test retrieving chunk metadata + retrieved, exists := cache.GetChunkMetadata(inode, chunkIndex) + if !exists { + t.Error("Chunk metadata should exist") + } + + if retrieved.FileId != metadata.FileId { + t.Errorf("Expected FileId %s, got %s", metadata.FileId, retrieved.FileId) + } + + if retrieved.AccessCount != 2 { // Should be incremented during retrieval + t.Errorf("Expected access count 2, got %d", retrieved.AccessCount) + } +} + +func TestOpenFileCache_LRUEviction(t *testing.T) { + cache := NewOpenFileCache(3, 5*time.Minute) // Small cache for testing + defer cache.Shutdown() + + // Fill cache to capacity + for i := 1; i <= 3; i++ { + entry := &filer_pb.Entry{ + Name: "file" + string(rune('0'+i)) + ".txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/file" + string(rune('0'+i)) + ".txt" + cache.OpenFile(uint64(i), entry, fullPath) + cache.CloseFile(uint64(i)) // Close immediately so they can be evicted + } + + // Add one more file - should trigger eviction + entry4 := &filer_pb.Entry{ + Name: "file4.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + cache.OpenFile(uint64(4), entry4, "/test/file4.txt") + + metrics := cache.GetMetrics() + if metrics.EvictedFiles == 0 { + t.Error("Should have evicted at least one file") + } + + // File 1 should be evicted (oldest) + file1Info := cache.GetFileInfo(uint64(1)) + if file1Info != nil { + t.Error("File 1 should have been evicted") + } + + // File 4 should still be there + file4Info := cache.GetFileInfo(uint64(4)) + if file4Info == nil { + t.Error("File 4 should still be in cache") + } +} + +func TestOpenFileCache_TTLCleanup(t *testing.T) { + cache := NewOpenFileCache(10, 100*time.Millisecond) // Short TTL for testing + defer cache.Shutdown() + + inode := uint64(1) + entry := &filer_pb.Entry{ + Name: "test.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + + fileInfo := cache.OpenFile(inode, entry, "/test/test.txt") + cache.CloseFile(inode) // Close so it can be cleaned up + + // Wait for TTL to expire + time.Sleep(150 * time.Millisecond) + + // Trigger cleanup manually + cache.cleanup() + + // File should be cleaned up + retrievedInfo := cache.GetFileInfo(inode) + if retrievedInfo != nil { + t.Error("File should have been cleaned up after TTL expiration") + } + + _ = fileInfo // Avoid unused variable warning +} + +func TestOpenFileCache_MultipleOpens(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + inode := uint64(1) + entry := &filer_pb.Entry{ + Name: "shared.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/shared.txt" + + // Open file multiple times + fileInfo1 := cache.OpenFile(inode, entry, fullPath) + fileInfo2 := cache.OpenFile(inode, entry, fullPath) + + if fileInfo1 != fileInfo2 { + t.Error("Multiple opens of same file should return same file info") + } + + if fileInfo1.OpenCount != 2 { + t.Errorf("Expected open count 2, got %d", fileInfo1.OpenCount) + } + + // Close once + canEvict1 := cache.CloseFile(inode) + if canEvict1 { + t.Error("Should not be able to evict file with open count > 0") + } + + if fileInfo1.OpenCount != 1 { + t.Errorf("Expected open count 1 after first close, got %d", fileInfo1.OpenCount) + } + + // Close again + canEvict2 := cache.CloseFile(inode) + if !canEvict2 { + t.Error("Should be able to evict file with open count 0") + } +} + +func TestOpenFileCache_Metrics(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + // Add some files of different types + files := []struct { + inode uint64 + filename string + path string + size uint64 + }{ + {1, "model.pt", "/models/model.pt", 100 * 1024 * 1024}, + {2, "data.jpg", "/datasets/data.jpg", 2 * 1024 * 1024}, + {3, "config.yaml", "/config/config.yaml", 1024}, + {4, "regular.txt", "/docs/regular.txt", 5 * 1024}, + } + + for _, file := range files { + entry := &filer_pb.Entry{ + Name: file.filename, + Attributes: &filer_pb.FuseAttributes{ + FileSize: file.size, + }, + } + cache.OpenFile(file.inode, entry, file.path) + + // Add some chunk metadata + metadata := &ChunkMetadata{ + FileId: "chunk_" + string(rune(file.inode)), + Offset: 0, + Size: 1024, + CacheLevel: 0, + } + cache.UpdateChunkCache(file.inode, 0, metadata) + } + + metrics := cache.GetMetrics() + + if metrics.TotalFiles != 4 { + t.Errorf("Expected 4 total files, got %d", metrics.TotalFiles) + } + + if metrics.MLFiles < 2 { // Should detect at least model and dataset + t.Errorf("Expected at least 2 ML files, got %d", metrics.MLFiles) + } + + if metrics.TotalChunks != 4 { + t.Errorf("Expected 4 total chunks, got %d", metrics.TotalChunks) + } + + // Check file type counts + if metrics.FileTypes[MLFileModel] == 0 { + t.Error("Should detect at least one model file") + } + + if metrics.FileTypes[MLFileDataset] == 0 { + t.Error("Should detect at least one dataset file") + } +} + +func TestOpenFileCache_ConcurrentAccess(t *testing.T) { + cache := NewOpenFileCache(100, 5*time.Minute) + defer cache.Shutdown() + + // Test concurrent access to the cache + numGoroutines := 10 + done := make(chan bool, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer func() { done <- true }() + + inode := uint64(id) + entry := &filer_pb.Entry{ + Name: "file" + string(rune('0'+id)) + ".txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/file" + string(rune('0'+id)) + ".txt" + + // Perform multiple operations + for j := 0; j < 10; j++ { + cache.OpenFile(inode, entry, fullPath) + + metadata := &ChunkMetadata{ + FileId: "chunk_" + string(rune(id)) + "_" + string(rune(j)), + Offset: uint64(j * 1024), + Size: 1024, + CacheLevel: 0, + } + cache.UpdateChunkCache(inode, uint32(j), metadata) + + cache.GetChunkMetadata(inode, uint32(j)) + cache.CloseFile(inode) + } + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Verify cache state + metrics := cache.GetMetrics() + if metrics.TotalFiles == 0 { + t.Error("Should have some files in cache after concurrent operations") + } +} + +func TestMLFileDetector_Extensions(t *testing.T) { + detector := newMLFileDetector() + + testCases := []struct { + filename string + path string + expected MLFileType + }{ + {"model.pt", "/models/model.pt", MLFileModel}, + {"weights.pth", "/models/weights.pth", MLFileModel}, + {"data.jpg", "/datasets/data.jpg", MLFileDataset}, + {"config.yaml", "/config/config.yaml", MLFileConfig}, + {"tensor.safetensors", "/tensors/tensor.safetensors", MLFileModel}, + {"training.log", "/logs/training.log", MLFileLog}, + {"document.txt", "/docs/document.txt", MLFileUnknown}, + } + + for _, tc := range testCases { + t.Run(tc.filename, func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: tc.filename, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + + isML, fileType := detector.DetectMLFile(entry, tc.path) + + if tc.expected == MLFileUnknown { + // For unknown files, either ML detection result is acceptable + t.Logf("File %s: isML=%v, type=%v", tc.filename, isML, fileType) + } else { + if !isML { + t.Errorf("File %s should be detected as ML file", tc.filename) + } + + if fileType != tc.expected { + t.Errorf("File %s: expected type %v, got %v", tc.filename, tc.expected, fileType) + } + } + }) + } +} + +func TestMLFileDetector_PathPatterns(t *testing.T) { + detector := newMLFileDetector() + + testCases := []struct { + path string + filename string + expected MLFileType + }{ + {"/datasets/train/file.bin", "file.bin", MLFileDataset}, + {"/models/checkpoint/weights", "weights", MLFileModel}, + {"/data/validation/sample.dat", "sample.dat", MLFileDataset}, + {"/checkpoints/model_v1.bin", "model_v1.bin", MLFileModel}, + {"/documents/report.pdf", "report.pdf", MLFileUnknown}, + } + + for _, tc := range testCases { + t.Run(tc.path, func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: tc.filename, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + + isML, fileType := detector.DetectMLFile(entry, tc.path) + + if tc.expected == MLFileUnknown { + t.Logf("Path %s: isML=%v, type=%v", tc.path, isML, fileType) + } else { + if !isML { + t.Errorf("Path %s should be detected as ML file", tc.path) + } + + if fileType != tc.expected { + t.Errorf("Path %s: expected type %v, got %v", tc.path, tc.expected, fileType) + } + } + }) + } +} + +func TestMLFileDetector_SizeHeuristics(t *testing.T) { + detector := newMLFileDetector() + + // Large file with model-related name should be detected as model + largeModelEntry := &filer_pb.Entry{ + Name: "large_model.bin", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 500 * 1024 * 1024, // 500MB + }, + } + + isML, fileType := detector.DetectMLFile(largeModelEntry, "/checkpoints/large_model.bin") + + if !isML { + t.Error("Large model file should be detected as ML file") + } + + if fileType != MLFileModel { + t.Errorf("Large model file should be detected as model, got %v", fileType) + } +} + +func TestOpenFileCache_EvictionProtection(t *testing.T) { + cache := NewOpenFileCache(2, 5*time.Minute) // Very small cache + defer cache.Shutdown() + + // Open two files and keep them open + for i := 1; i <= 2; i++ { + entry := &filer_pb.Entry{ + Name: "file" + string(rune('0'+i)) + ".txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/file" + string(rune('0'+i)) + ".txt" + cache.OpenFile(uint64(i), entry, fullPath) + // Don't close - keep them open + } + + // Try to open a third file - should not evict open files + entry3 := &filer_pb.Entry{ + Name: "file3.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + cache.OpenFile(uint64(3), entry3, "/test/file3.txt") + + // All files should still be there since none could be evicted + for i := 1; i <= 3; i++ { + fileInfo := cache.GetFileInfo(uint64(i)) + if fileInfo == nil { + t.Errorf("File %d should still be in cache (eviction protection)", i) + } + } +} + +func TestOpenFileCache_GetFileInfo_CacheHitMiss(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + defer cache.Shutdown() + + inode := uint64(1) + + // Test cache miss + fileInfo := cache.GetFileInfo(inode) + if fileInfo != nil { + t.Error("Should return nil for non-existent file") + } + + initialMetrics := cache.GetMetrics() + if initialMetrics.CacheMisses == 0 { + t.Error("Should record cache miss") + } + + // Add file to cache + entry := &filer_pb.Entry{ + Name: "test.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + cache.OpenFile(inode, entry, "/test/test.txt") + + // Test cache hit + fileInfo = cache.GetFileInfo(inode) + if fileInfo == nil { + t.Error("Should return file info for existing file") + } + + finalMetrics := cache.GetMetrics() + if finalMetrics.CacheHits == 0 { + t.Error("Should record cache hit") + } + + if finalMetrics.CacheHits <= initialMetrics.CacheHits { + t.Error("Cache hits should increase") + } +} + +func TestOpenFileCache_Shutdown(t *testing.T) { + cache := NewOpenFileCache(10, 5*time.Minute) + + // Add some files + for i := 1; i <= 3; i++ { + entry := &filer_pb.Entry{ + Name: "file" + string(rune('0'+i)) + ".txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/file" + string(rune('0'+i)) + ".txt" + cache.OpenFile(uint64(i), entry, fullPath) + } + + // Test graceful shutdown + done := make(chan struct{}) + go func() { + cache.Shutdown() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Error("Shutdown took too long") + } +} + +// Benchmark tests + +func BenchmarkOpenFileCache_OpenFile(b *testing.B) { + cache := NewOpenFileCache(1000, 30*time.Minute) + defer cache.Shutdown() + + entry := &filer_pb.Entry{ + Name: "benchmark.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/benchmark.txt" + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + inode := uint64(i % 100) // Cycle through 100 files + cache.OpenFile(inode, entry, fullPath) + } +} + +func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) { + cache := NewOpenFileCache(1000, 30*time.Minute) + defer cache.Shutdown() + + // Pre-populate cache + entry := &filer_pb.Entry{ + Name: "benchmark.txt", + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + } + fullPath := "/test/benchmark.txt" + + for i := 0; i < 100; i++ { + cache.OpenFile(uint64(i), entry, fullPath) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + inode := uint64(i % 100) + cache.GetFileInfo(inode) + } +} \ No newline at end of file diff --git a/weed/mount/ml_integration.go b/weed/mount/ml_integration.go new file mode 100644 index 000000000..c79882c82 --- /dev/null +++ b/weed/mount/ml_integration.go @@ -0,0 +1,142 @@ +package mount + +import ( + "time" + + "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mount/ml" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// MLIntegrationManager manages ML optimization integration for the main WFS +type MLIntegrationManager struct { + mlOptimization *ml.MLOptimization + fuseIntegration *ml.FUSEMLIntegration + enabled bool +} + +// NewMLIntegrationManager creates a new ML integration manager +func NewMLIntegrationManager(chunkCache chunk_cache.ChunkCache, lookupFn wdclient.LookupFileIdFunctionType) *MLIntegrationManager { + // Create ML optimization with default config + config := ml.DefaultMLConfig() + mlOpt := ml.NewMLOptimization(config, chunkCache, lookupFn) + + // Create FUSE integration + fuseInt := ml.NewFUSEMLIntegration(mlOpt) + + manager := &MLIntegrationManager{ + mlOptimization: mlOpt, + fuseIntegration: fuseInt, + enabled: true, + } + + glog.V(1).Infof("ML integration manager initialized") + return manager +} + +// EnableMLOptimization enables or disables ML optimization +func (mgr *MLIntegrationManager) EnableMLOptimization(enabled bool) { + mgr.enabled = enabled + + if mgr.mlOptimization != nil { + mgr.mlOptimization.Enable(enabled) + } + + if mgr.fuseIntegration != nil { + mgr.fuseIntegration.EnableMLOptimizations(enabled) + } + + glog.V(1).Infof("ML optimization %s", map[bool]string{true: "enabled", false: "disabled"}[enabled]) +} + +// OnFileOpen should be called when a file is opened +func (mgr *MLIntegrationManager) OnFileOpen(inode uint64, entry *filer_pb.Entry, fullPath string, flags uint32, out *fuse.OpenOut) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OnFileOpen(inode, entry, fullPath, flags, out) +} + +// OnFileClose should be called when a file is closed +func (mgr *MLIntegrationManager) OnFileClose(inode uint64) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OnFileClose(inode) +} + +// OnFileRead should be called when a file is read +func (mgr *MLIntegrationManager) OnFileRead(inode uint64, offset int64, size int) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OnFileRead(inode, offset, size) +} + +// OnChunkAccess should be called when a chunk is accessed +func (mgr *MLIntegrationManager) OnChunkAccess(inode uint64, chunkIndex uint32, fileId string, cacheLevel int, isHit bool) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OnChunkAccess(inode, chunkIndex, fileId, cacheLevel, isHit) +} + +// OptimizeAttributes applies ML-specific attribute caching +func (mgr *MLIntegrationManager) OptimizeAttributes(inode uint64, out *fuse.AttrOut) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OptimizeAttributes(inode, out) +} + +// OptimizeEntryCache applies ML-specific entry caching +func (mgr *MLIntegrationManager) OptimizeEntryCache(inode uint64, entry *filer_pb.Entry, out *fuse.EntryOut) { + if !mgr.enabled || mgr.fuseIntegration == nil { + return + } + + mgr.fuseIntegration.OptimizeEntryCache(inode, entry, out) +} + +// ShouldEnableWriteback determines if writeback should be enabled for a file +func (mgr *MLIntegrationManager) ShouldEnableWriteback(inode uint64, entry *filer_pb.Entry) bool { + if !mgr.enabled || mgr.fuseIntegration == nil { + return false + } + + return mgr.fuseIntegration.ShouldEnableWriteback(inode, entry) +} + +// GetComprehensiveMetrics returns all ML optimization metrics +func (mgr *MLIntegrationManager) GetComprehensiveMetrics() *ml.FUSEMLMetrics { + if !mgr.enabled || mgr.fuseIntegration == nil { + return &ml.FUSEMLMetrics{} + } + + metrics := mgr.fuseIntegration.GetOptimizationMetrics() + return &metrics +} + +// IsEnabled returns whether ML optimization is enabled +func (mgr *MLIntegrationManager) IsEnabled() bool { + return mgr.enabled +} + +// Shutdown gracefully shuts down the ML integration +func (mgr *MLIntegrationManager) Shutdown() { + glog.V(1).Infof("Shutting down ML integration manager...") + + if mgr.fuseIntegration != nil { + mgr.fuseIntegration.Shutdown() + } + + glog.V(1).Infof("ML integration manager shutdown complete") +}