From 63b94321ec015ca6565364fc3b97f9a849f7e0ee Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 30 Aug 2025 15:32:00 -0700 Subject: [PATCH] fmt --- weed/mount/ml/cache_policy.go | 124 ++++++------- weed/mount/ml/cache_policy_test.go | 254 ++++++++++++------------- weed/mount/ml/fuse_integration.go | 116 ++++++------ weed/mount/ml/open_file_cache.go | 256 +++++++++++++------------- weed/mount/ml/open_file_cache_test.go | 184 +++++++++--------- weed/mount/ml_integration.go | 38 ++-- 6 files changed, 485 insertions(+), 487 deletions(-) diff --git a/weed/mount/ml/cache_policy.go b/weed/mount/ml/cache_policy.go index 256b36dc9..44650a44d 100644 --- a/weed/mount/ml/cache_policy.go +++ b/weed/mount/ml/cache_policy.go @@ -17,36 +17,36 @@ type CacheEntry struct { Pattern AccessPattern // Detected access pattern FileType MLFileType // Type of ML file IsHot bool // Whether this is a hot chunk - + // ML-specific metadata - IsTrainingData bool // Whether this is training data - IsModel bool // Whether this is a model file - PredictedReuse float64 // Predicted reuse probability (0.0-1.0) - EpochRelevance float64 // Relevance for current training epoch + IsTrainingData bool // Whether this is training data + IsModel bool // Whether this is a model file + PredictedReuse float64 // Predicted reuse probability (0.0-1.0) + EpochRelevance float64 // Relevance for current training epoch } // MLCachePolicy implements ML-aware cache eviction policy type MLCachePolicy struct { // Weights for different factors (sum should be 1.0) accessFrequencyWeight float64 // Weight for access frequency - recencyWeight float64 // Weight for access recency + recencyWeight float64 // Weight for access recency sizeWeight float64 // Weight for item size mlWeight float64 // Weight for ML-specific factors - + // ML-specific parameters - trainingDataBoost float64 // Boost factor for training data - modelFileBoost float64 // Boost factor for model files - sequentialBoost float64 // Boost factor for sequential access - epochRelevanceBoost float64 // Boost factor for epoch-relevant data - + trainingDataBoost float64 // Boost factor for training data + modelFileBoost float64 // Boost factor for model files + sequentialBoost float64 // Boost factor for sequential access + epochRelevanceBoost float64 // Boost factor for epoch-relevant data + // Time-based parameters - hotThreshold time.Duration // Threshold for considering item "hot" - coldThreshold time.Duration // Threshold for considering item "cold" - + hotThreshold time.Duration // Threshold for considering item "hot" + coldThreshold time.Duration // Threshold for considering item "cold" + // Size-based parameters - largeFileThreshold uint64 // Threshold for large files - smallFilePreference float64 // Preference for keeping small files - + largeFileThreshold uint64 // Threshold for large files + smallFilePreference float64 // Preference for keeping small files + // Statistics totalEvictions int64 mlFileEvictions int64 @@ -60,19 +60,19 @@ func NewMLCachePolicy() *MLCachePolicy { // Balanced weights accessFrequencyWeight: 0.3, recencyWeight: 0.3, - sizeWeight: 0.2, - mlWeight: 0.2, - + sizeWeight: 0.2, + mlWeight: 0.2, + // ML-specific boosts trainingDataBoost: 1.5, // 50% boost for training data modelFileBoost: 2.0, // 100% boost for model files sequentialBoost: 1.3, // 30% boost for sequential access epochRelevanceBoost: 1.4, // 40% boost for epoch-relevant data - + // Time thresholds hotThreshold: 1 * time.Minute, coldThreshold: 10 * time.Minute, - + // Size parameters largeFileThreshold: 10 * 1024 * 1024, // 10MB smallFilePreference: 1.2, // 20% preference for small files @@ -84,32 +84,32 @@ func NewMLCachePolicy() *MLCachePolicy { func (policy *MLCachePolicy) CalculateEvictionScore(entry *CacheEntry) float64 { now := time.Now() timeSinceAccess := now.Sub(entry.LastAccess) - + // Base factors accessFrequencyScore := policy.calculateAccessFrequencyScore(entry) recencyScore := policy.calculateRecencyScore(timeSinceAccess) sizeScore := policy.calculateSizeScore(entry.Size) mlScore := policy.calculateMLScore(entry) - + // Weighted combination totalScore := policy.accessFrequencyWeight*accessFrequencyScore + policy.recencyWeight*recencyScore + policy.sizeWeight*sizeScore + policy.mlWeight*mlScore - - glog.V(4).Infof("Eviction score for inode=%d: total=%.3f (freq=%.3f, recency=%.3f, size=%.3f, ml=%.3f)", + + glog.V(4).Infof("Eviction score for inode=%d: total=%.3f (freq=%.3f, recency=%.3f, size=%.3f, ml=%.3f)", entry.Inode, totalScore, accessFrequencyScore, recencyScore, sizeScore, mlScore) - + return totalScore } // ShouldEvict determines if a cache entry should be evicted func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool { score := policy.CalculateEvictionScore(entry) - + // Different thresholds based on ML file type threshold := 0.3 // Default threshold - + switch entry.FileType { case MLFileModel: threshold = 0.1 // Very low threshold - keep models cached longer @@ -126,9 +126,9 @@ func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool { default: threshold = 0.3 // Default for unknown files } - + shouldEvict := score < threshold - + if shouldEvict { policy.totalEvictions++ if entry.IsTrainingData { @@ -140,11 +140,11 @@ func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool { if entry.FileType != MLFileUnknown { policy.mlFileEvictions++ } - - glog.V(4).Infof("Evicting: inode=%d, score=%.3f < threshold=%.3f, type=%v", + + glog.V(4).Infof("Evicting: inode=%d, score=%.3f < threshold=%.3f, type=%v", entry.Inode, score, threshold, entry.FileType) } - + return shouldEvict } @@ -153,10 +153,10 @@ func (policy *MLCachePolicy) calculateAccessFrequencyScore(entry *CacheEntry) fl if entry.AccessCount == 0 { return 0.0 } - + // Logarithmic scaling for access count base := math.Log(float64(entry.AccessCount) + 1) - + // Apply ML-specific boosts boost := 1.0 if entry.IsTrainingData { @@ -171,7 +171,7 @@ func (policy *MLCachePolicy) calculateAccessFrequencyScore(entry *CacheEntry) fl if entry.EpochRelevance > 0.5 { boost *= policy.epochRelevanceBoost } - + return base * boost } @@ -180,11 +180,11 @@ func (policy *MLCachePolicy) calculateRecencyScore(timeSinceAccess time.Duration if timeSinceAccess <= policy.hotThreshold { return 1.0 // Very recent access } - + if timeSinceAccess >= policy.coldThreshold { return 0.1 // Very old access } - + // Linear decay between hot and cold thresholds ratio := float64(timeSinceAccess-policy.hotThreshold) / float64(policy.coldThreshold-policy.hotThreshold) return 1.0 - ratio*0.9 // Decay from 1.0 to 0.1 @@ -196,7 +196,7 @@ func (policy *MLCachePolicy) calculateSizeScore(size uint64) float64 { // Prefer keeping smaller files (higher score) return policy.smallFilePreference } - + // Larger files get lower score (more likely to be evicted) // But not too low since they might be important model files ratio := float64(size) / float64(policy.largeFileThreshold) @@ -206,7 +206,7 @@ func (policy *MLCachePolicy) calculateSizeScore(size uint64) float64 { // calculateMLScore calculates ML-specific factors func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 { score := 0.5 // Base score for non-ML files - + // File type bonuses switch entry.FileType { case MLFileModel: @@ -222,7 +222,7 @@ func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 { default: score = 0.5 // Default for unknown files } - + // Access pattern bonuses switch entry.Pattern { case SequentialAccess: @@ -234,22 +234,22 @@ func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 { case BatchAccess: score *= 1.1 // Small boost for batch access } - + // Predicted reuse bonus if entry.PredictedReuse > 0.7 { score *= 1.2 // Boost for high predicted reuse } - + // Epoch relevance bonus if entry.EpochRelevance > 0.5 { score *= (1.0 + entry.EpochRelevance*0.3) // Up to 30% boost for epoch relevance } - + // Hot chunk bonus if entry.IsHot { score *= 1.1 } - + return score } @@ -260,27 +260,27 @@ func (policy *MLCachePolicy) GetEvictionMetrics() MLCachePolicyMetrics { MLFileEvictions: policy.mlFileEvictions, TrainingDataEvictions: policy.trainingDataEvictions, ModelFileEvictions: policy.modelFileEvictions, - + // Configuration AccessFrequencyWeight: policy.accessFrequencyWeight, RecencyWeight: policy.recencyWeight, - SizeWeight: policy.sizeWeight, - MLWeight: policy.mlWeight, + SizeWeight: policy.sizeWeight, + MLWeight: policy.mlWeight, } } // MLCachePolicyMetrics holds metrics for the ML cache policy type MLCachePolicyMetrics struct { - TotalEvictions int64 `json:"total_evictions"` - MLFileEvictions int64 `json:"ml_file_evictions"` - TrainingDataEvictions int64 `json:"training_data_evictions"` - ModelFileEvictions int64 `json:"model_file_evictions"` - + TotalEvictions int64 `json:"total_evictions"` + MLFileEvictions int64 `json:"ml_file_evictions"` + TrainingDataEvictions int64 `json:"training_data_evictions"` + ModelFileEvictions int64 `json:"model_file_evictions"` + // Configuration weights AccessFrequencyWeight float64 `json:"access_frequency_weight"` RecencyWeight float64 `json:"recency_weight"` - SizeWeight float64 `json:"size_weight"` - MLWeight float64 `json:"ml_weight"` + SizeWeight float64 `json:"size_weight"` + MLWeight float64 `json:"ml_weight"` } // SetWeights updates the eviction policy weights @@ -290,14 +290,14 @@ func (policy *MLCachePolicy) SetWeights(frequency, recency, size, ml float64) { glog.Warningf("Invalid weights provided, using defaults") return } - + // Normalize weights to sum to 1.0 policy.accessFrequencyWeight = frequency / total policy.recencyWeight = recency / total policy.sizeWeight = size / total policy.mlWeight = ml / total - - glog.V(2).Infof("Updated eviction policy weights: freq=%.2f, recency=%.2f, size=%.2f, ml=%.2f", + + glog.V(2).Infof("Updated eviction policy weights: freq=%.2f, recency=%.2f, size=%.2f, ml=%.2f", policy.accessFrequencyWeight, policy.recencyWeight, policy.sizeWeight, policy.mlWeight) } @@ -307,7 +307,7 @@ func (policy *MLCachePolicy) SetMLBoosts(trainingData, model, sequential, epochR policy.modelFileBoost = model policy.sequentialBoost = sequential policy.epochRelevanceBoost = epochRelevance - - glog.V(2).Infof("Updated ML boost factors: training=%.2f, model=%.2f, sequential=%.2f, epoch=%.2f", + + glog.V(2).Infof("Updated ML boost factors: training=%.2f, model=%.2f, sequential=%.2f, epoch=%.2f", trainingData, model, sequential, epochRelevance) } diff --git a/weed/mount/ml/cache_policy_test.go b/weed/mount/ml/cache_policy_test.go index 29df5b859..00ccff218 100644 --- a/weed/mount/ml/cache_policy_test.go +++ b/weed/mount/ml/cache_policy_test.go @@ -7,7 +7,7 @@ import ( func TestMLCachePolicy_Basic(t *testing.T) { policy := NewMLCachePolicy() - + // Test basic eviction score calculation entry := &CacheEntry{ Inode: 1, @@ -19,19 +19,19 @@ func TestMLCachePolicy_Basic(t *testing.T) { FileType: MLFileUnknown, IsHot: false, } - + score := policy.CalculateEvictionScore(entry) if score <= 0 { t.Error("Eviction score should be positive") } - + shouldEvict := policy.ShouldEvict(entry) t.Logf("Basic entry eviction: score=%.3f, shouldEvict=%v", score, shouldEvict) } func TestMLCachePolicy_ModelFileBoost(t *testing.T) { policy := NewMLCachePolicy() - + // Create two identical entries, one is a model file baseEntry := &CacheEntry{ Inode: 1, @@ -43,7 +43,7 @@ func TestMLCachePolicy_ModelFileBoost(t *testing.T) { FileType: MLFileUnknown, IsModel: false, } - + modelEntry := &CacheEntry{ Inode: 2, Size: 10 * 1024 * 1024, // 10MB @@ -54,60 +54,60 @@ func TestMLCachePolicy_ModelFileBoost(t *testing.T) { FileType: MLFileModel, IsModel: true, } - + baseScore := policy.CalculateEvictionScore(baseEntry) modelScore := policy.CalculateEvictionScore(modelEntry) - + if modelScore <= baseScore { - t.Errorf("Model file should have higher score than regular file: model=%.3f, base=%.3f", + t.Errorf("Model file should have higher score than regular file: model=%.3f, base=%.3f", modelScore, baseScore) } - + // Model files should be less likely to be evicted baseShouldEvict := policy.ShouldEvict(baseEntry) modelShouldEvict := policy.ShouldEvict(modelEntry) - + if modelShouldEvict && !baseShouldEvict { t.Error("Model file should not be evicted if regular file is not evicted") } - - t.Logf("Model vs Base eviction: model=%.3f (evict=%v), base=%.3f (evict=%v)", + + t.Logf("Model vs Base eviction: model=%.3f (evict=%v), base=%.3f (evict=%v)", modelScore, modelShouldEvict, baseScore, baseShouldEvict) } func TestMLCachePolicy_TrainingDataBoost(t *testing.T) { policy := NewMLCachePolicy() - + regularEntry := &CacheEntry{ - Inode: 1, - Size: 1024, - LastAccess: time.Now().Add(-2 * time.Minute), - AccessCount: 10, - FileType: MLFileUnknown, + Inode: 1, + Size: 1024, + LastAccess: time.Now().Add(-2 * time.Minute), + AccessCount: 10, + FileType: MLFileUnknown, IsTrainingData: false, } - + trainingEntry := &CacheEntry{ - Inode: 2, - Size: 1024, - LastAccess: time.Now().Add(-2 * time.Minute), - AccessCount: 10, - FileType: MLFileDataset, + Inode: 2, + Size: 1024, + LastAccess: time.Now().Add(-2 * time.Minute), + AccessCount: 10, + FileType: MLFileDataset, IsTrainingData: true, } - + regularScore := policy.CalculateEvictionScore(regularEntry) trainingScore := policy.CalculateEvictionScore(trainingEntry) - + if trainingScore <= regularScore { - t.Errorf("Training data should have higher score: training=%.3f, regular=%.3f", + t.Errorf("Training data should have higher score: training=%.3f, regular=%.3f", trainingScore, regularScore) } } func TestMLCachePolicy_AccessPatternBoost(t *testing.T) { policy := NewMLCachePolicy() - + randomEntry := &CacheEntry{ Inode: 1, Size: 1024, @@ -116,7 +116,7 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) { Pattern: RandomAccess, FileType: MLFileDataset, } - + sequentialEntry := &CacheEntry{ Inode: 2, Size: 1024, @@ -125,7 +125,7 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) { Pattern: SequentialAccess, FileType: MLFileDataset, } - + modelAccessEntry := &CacheEntry{ Inode: 3, Size: 1024, @@ -134,28 +134,28 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) { Pattern: ModelAccess, FileType: MLFileModel, } - + randomScore := policy.CalculateEvictionScore(randomEntry) sequentialScore := policy.CalculateEvictionScore(sequentialEntry) modelScore := policy.CalculateEvictionScore(modelAccessEntry) - + if sequentialScore <= randomScore { - t.Errorf("Sequential access should have higher score than random: seq=%.3f, random=%.3f", + t.Errorf("Sequential access should have higher score than random: seq=%.3f, random=%.3f", sequentialScore, randomScore) } - + if modelScore <= sequentialScore { - t.Errorf("Model access should have highest score: model=%.3f, seq=%.3f", + t.Errorf("Model access should have highest score: model=%.3f, seq=%.3f", modelScore, sequentialScore) } - - t.Logf("Pattern comparison: random=%.3f, sequential=%.3f, model=%.3f", + + t.Logf("Pattern comparison: random=%.3f, sequential=%.3f, model=%.3f", randomScore, sequentialScore, modelScore) } func TestMLCachePolicy_SizePreference(t *testing.T) { policy := NewMLCachePolicy() - + smallEntry := &CacheEntry{ Inode: 1, Size: 1024, // 1KB @@ -163,7 +163,7 @@ func TestMLCachePolicy_SizePreference(t *testing.T) { AccessCount: 3, FileType: MLFileUnknown, } - + largeEntry := &CacheEntry{ Inode: 2, Size: 50 * 1024 * 1024, // 50MB @@ -171,52 +171,52 @@ func TestMLCachePolicy_SizePreference(t *testing.T) { AccessCount: 3, FileType: MLFileUnknown, } - + smallScore := policy.CalculateEvictionScore(smallEntry) largeScore := policy.CalculateEvictionScore(largeEntry) - + if smallScore <= largeScore { - t.Errorf("Small files should have higher score than large files: small=%.3f, large=%.3f", + t.Errorf("Small files should have higher score than large files: small=%.3f, large=%.3f", smallScore, largeScore) } } func TestMLCachePolicy_RecencyDecay(t *testing.T) { policy := NewMLCachePolicy() - + // Create entries with different access times recentEntry := &CacheEntry{ - Inode: 1, - + Inode: 1, + Size: 1024, LastAccess: time.Now(), AccessCount: 5, FileType: MLFileUnknown, } - + oldEntry := &CacheEntry{ - Inode: 2, - + Inode: 2, + Size: 1024, LastAccess: time.Now().Add(-20 * time.Minute), AccessCount: 5, FileType: MLFileUnknown, } - + recentScore := policy.CalculateEvictionScore(recentEntry) oldScore := policy.CalculateEvictionScore(oldEntry) - + if recentScore <= oldScore { - t.Errorf("Recent access should have higher score: recent=%.3f, old=%.3f", + t.Errorf("Recent access should have higher score: recent=%.3f, old=%.3f", recentScore, oldScore) } } func TestMLCachePolicy_EpochRelevance(t *testing.T) { policy := NewMLCachePolicy() - + lowRelevanceEntry := &CacheEntry{ - Inode: 1, + Inode: 1, Size: 1024, LastAccess: time.Now(), @@ -224,9 +224,9 @@ func TestMLCachePolicy_EpochRelevance(t *testing.T) { FileType: MLFileDataset, EpochRelevance: 0.2, } - + highRelevanceEntry := &CacheEntry{ - Inode: 2, + Inode: 2, Size: 1024, LastAccess: time.Now(), @@ -234,112 +234,112 @@ func TestMLCachePolicy_EpochRelevance(t *testing.T) { FileType: MLFileDataset, EpochRelevance: 0.9, } - + lowScore := policy.CalculateEvictionScore(lowRelevanceEntry) highScore := policy.CalculateEvictionScore(highRelevanceEntry) - + if highScore <= lowScore { - t.Errorf("High epoch relevance should have higher score: high=%.3f, low=%.3f", + t.Errorf("High epoch relevance should have higher score: high=%.3f, low=%.3f", highScore, lowScore) } } func TestMLCachePolicy_DifferentThresholds(t *testing.T) { policy := NewMLCachePolicy() - + // Create entries for different file types with same base score unknownEntry := &CacheEntry{ - Inode: 1, - + Inode: 1, + Size: 1024, LastAccess: time.Now().Add(-15 * time.Minute), // Old enough to potentially evict AccessCount: 2, FileType: MLFileUnknown, } - + modelEntry := &CacheEntry{ - Inode: 2, - + Inode: 2, + Size: 1024, LastAccess: time.Now().Add(-15 * time.Minute), AccessCount: 2, FileType: MLFileModel, IsModel: true, } - + datasetEntry := &CacheEntry{ - Inode: 3, - + Inode: 3, + Size: 1024, LastAccess: time.Now().Add(-15 * time.Minute), AccessCount: 2, FileType: MLFileDataset, Pattern: SequentialAccess, } - + unknownShouldEvict := policy.ShouldEvict(unknownEntry) modelShouldEvict := policy.ShouldEvict(modelEntry) datasetShouldEvict := policy.ShouldEvict(datasetEntry) - + // Models should be least likely to be evicted if modelShouldEvict && (!unknownShouldEvict || !datasetShouldEvict) { t.Error("Model files should be least likely to be evicted") } - - t.Logf("Eviction by type: unknown=%v, model=%v, dataset=%v", + + t.Logf("Eviction by type: unknown=%v, model=%v, dataset=%v", unknownShouldEvict, modelShouldEvict, datasetShouldEvict) } func TestMLCachePolicy_SetWeights(t *testing.T) { policy := NewMLCachePolicy() - + // Test setting custom weights policy.SetWeights(0.4, 0.3, 0.1, 0.2) - + if policy.accessFrequencyWeight != 0.4 { t.Errorf("Expected frequency weight 0.4, got %.2f", policy.accessFrequencyWeight) } - + if policy.recencyWeight != 0.3 { t.Errorf("Expected recency weight 0.3, got %.2f", policy.recencyWeight) } - + if policy.sizeWeight != 0.1 { t.Errorf("Expected size weight 0.1, got %.2f", policy.sizeWeight) } - + if policy.mlWeight != 0.2 { t.Errorf("Expected ML weight 0.2, got %.2f", policy.mlWeight) } - + // Test weight normalization policy.SetWeights(2.0, 2.0, 1.0, 1.0) // Total = 6.0 - + expectedFreq := 2.0 / 6.0 - if abs(policy.accessFrequencyWeight - expectedFreq) > 0.001 { - t.Errorf("Expected normalized frequency weight %.3f, got %.3f", + if abs(policy.accessFrequencyWeight-expectedFreq) > 0.001 { + t.Errorf("Expected normalized frequency weight %.3f, got %.3f", expectedFreq, policy.accessFrequencyWeight) } } func TestMLCachePolicy_SetMLBoosts(t *testing.T) { policy := NewMLCachePolicy() - + // Test setting custom boost factors policy.SetMLBoosts(2.0, 3.0, 1.5, 1.8) - + if policy.trainingDataBoost != 2.0 { t.Errorf("Expected training data boost 2.0, got %.2f", policy.trainingDataBoost) } - + if policy.modelFileBoost != 3.0 { t.Errorf("Expected model file boost 3.0, got %.2f", policy.modelFileBoost) } - + if policy.sequentialBoost != 1.5 { t.Errorf("Expected sequential boost 1.5, got %.2f", policy.sequentialBoost) } - + if policy.epochRelevanceBoost != 1.8 { t.Errorf("Expected epoch relevance boost 1.8, got %.2f", policy.epochRelevanceBoost) } @@ -347,30 +347,30 @@ func TestMLCachePolicy_SetMLBoosts(t *testing.T) { func TestMLCachePolicy_Metrics(t *testing.T) { policy := NewMLCachePolicy() - + // Simulate some evictions entries := []*CacheEntry{ {FileType: MLFileModel, IsModel: true}, {FileType: MLFileDataset, IsTrainingData: true}, {FileType: MLFileUnknown}, } - + for _, entry := range entries { entry.LastAccess = time.Now().Add(-30 * time.Minute) // Old enough to evict entry.AccessCount = 1 entry.Size = 1024 - + if policy.ShouldEvict(entry) { // Eviction counters are updated in ShouldEvict } } - + metrics := policy.GetEvictionMetrics() - + if metrics.TotalEvictions == 0 { t.Error("Should have some total evictions") } - + // Verify weight configuration in metrics if metrics.AccessFrequencyWeight != policy.accessFrequencyWeight { t.Error("Metrics should reflect current weight configuration") @@ -379,30 +379,30 @@ func TestMLCachePolicy_Metrics(t *testing.T) { func TestMLCachePolicy_HotChunkPreference(t *testing.T) { policy := NewMLCachePolicy() - + coldEntry := &CacheEntry{ - Inode: 1, - + Inode: 1, + Size: 1024, LastAccess: time.Now(), AccessCount: 5, IsHot: false, FileType: MLFileDataset, } - + hotEntry := &CacheEntry{ - Inode: 2, - + Inode: 2, + Size: 1024, LastAccess: time.Now(), AccessCount: 5, IsHot: true, FileType: MLFileDataset, } - + coldScore := policy.CalculateEvictionScore(coldEntry) hotScore := policy.CalculateEvictionScore(hotEntry) - + if hotScore <= coldScore { t.Errorf("Hot chunk should have higher score: hot=%.3f, cold=%.3f", hotScore, coldScore) } @@ -410,7 +410,7 @@ func TestMLCachePolicy_HotChunkPreference(t *testing.T) { func TestMLCachePolicy_RecencyThresholds(t *testing.T) { policy := NewMLCachePolicy() - + // Test hot threshold hotEntry := &CacheEntry{ Inode: 1, @@ -418,7 +418,7 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) { LastAccess: time.Now().Add(-30 * time.Second), // Within hot threshold AccessCount: 1, } - + // Test cold threshold coldEntry := &CacheEntry{ Inode: 2, @@ -426,7 +426,7 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) { LastAccess: time.Now().Add(-15 * time.Minute), // Beyond cold threshold AccessCount: 1, } - + // Test middle middleEntry := &CacheEntry{ Inode: 3, @@ -434,39 +434,39 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) { LastAccess: time.Now().Add(-5 * time.Minute), // Between thresholds AccessCount: 1, } - + hotScore := policy.calculateRecencyScore(time.Since(hotEntry.LastAccess)) coldScore := policy.calculateRecencyScore(time.Since(coldEntry.LastAccess)) middleScore := policy.calculateRecencyScore(time.Since(middleEntry.LastAccess)) - + if hotScore != 1.0 { t.Errorf("Hot entry should have score 1.0, got %.3f", hotScore) } - + if coldScore != 0.1 { t.Errorf("Cold entry should have score 0.1, got %.3f", coldScore) } - + if middleScore <= coldScore || middleScore >= hotScore { - t.Errorf("Middle entry should have score between hot and cold: %.3f not in (%.3f, %.3f)", + t.Errorf("Middle entry should have score between hot and cold: %.3f not in (%.3f, %.3f)", middleScore, coldScore, hotScore) } } func TestMLCachePolicy_SizeScore(t *testing.T) { policy := NewMLCachePolicy() - - smallSize := uint64(1024) // 1KB + + smallSize := uint64(1024) // 1KB largeSize := uint64(100 * 1024 * 1024) // 100MB - + smallScore := policy.calculateSizeScore(smallSize) largeScore := policy.calculateSizeScore(largeSize) - + if smallScore <= largeScore { - t.Errorf("Small files should have higher size score: small=%.3f, large=%.3f", + t.Errorf("Small files should have higher size score: small=%.3f, large=%.3f", smallScore, largeScore) } - + // Large files should still have reasonable score (not too low) if largeScore < 0.2 { t.Errorf("Large files should have reasonable score, got %.3f", largeScore) @@ -475,24 +475,24 @@ func TestMLCachePolicy_SizeScore(t *testing.T) { func TestMLCachePolicy_AccessFrequencyScore(t *testing.T) { policy := NewMLCachePolicy() - + lowAccessEntry := &CacheEntry{ AccessCount: 1, FileType: MLFileUnknown, Pattern: RandomAccess, } - + highAccessEntry := &CacheEntry{ AccessCount: 100, FileType: MLFileUnknown, Pattern: RandomAccess, } - + lowScore := policy.calculateAccessFrequencyScore(lowAccessEntry) highScore := policy.calculateAccessFrequencyScore(highAccessEntry) - + if highScore <= lowScore { - t.Errorf("High access count should have higher score: high=%.3f, low=%.3f", + t.Errorf("High access count should have higher score: high=%.3f, low=%.3f", highScore, lowScore) } } @@ -509,9 +509,9 @@ func abs(x float64) float64 { func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) { policy := NewMLCachePolicy() - + entry := &CacheEntry{ - Inode: 1, + Inode: 1, Size: 1024, LastAccess: time.Now().Add(-5 * time.Minute), @@ -521,9 +521,9 @@ func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) { IsTrainingData: true, EpochRelevance: 0.8, } - + b.ResetTimer() - + for i := 0; i < b.N; i++ { policy.CalculateEvictionScore(entry) } @@ -531,18 +531,18 @@ func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) { func BenchmarkMLCachePolicy_ShouldEvict(b *testing.B) { policy := NewMLCachePolicy() - + entry := &CacheEntry{ - Inode: 1, - + Inode: 1, + Size: 1024, LastAccess: time.Now().Add(-5 * time.Minute), AccessCount: 10, FileType: MLFileDataset, } - + b.ResetTimer() - + for i := 0; i < b.N; i++ { policy.ShouldEvict(entry) } diff --git a/weed/mount/ml/fuse_integration.go b/weed/mount/ml/fuse_integration.go index 54b770eb5..71597f5d4 100644 --- a/weed/mount/ml/fuse_integration.go +++ b/weed/mount/ml/fuse_integration.go @@ -14,22 +14,22 @@ type FUSEMLIntegration struct { openFileCache *OpenFileCache cachePolicy *MLCachePolicy mlOptimization *MLOptimization - + // FUSE-specific configuration - enableKeepCache bool // Enable FOPEN_KEEP_CACHE for ML files - enableWriteback bool // Enable writeback caching - attrCacheTimeout time.Duration // Attribute cache timeout for ML files - entryCacheTimeout time.Duration // Entry cache timeout for ML files - + enableKeepCache bool // Enable FOPEN_KEEP_CACHE for ML files + enableWriteback bool // Enable writeback caching + attrCacheTimeout time.Duration // Attribute cache timeout for ML files + entryCacheTimeout time.Duration // Entry cache timeout for ML files + // ML-specific FUSE optimizations mlAttrTimeout time.Duration // Extended attribute timeout for ML files datasetAttrTimeout time.Duration // Even longer timeout for dataset files modelAttrTimeout time.Duration // Longest timeout for model files - + // Statistics - keepCacheEnabled int64 // Number of times keep cache was enabled - writebackEnabled int64 // Number of times writeback was enabled - mlAttrCacheHits int64 // ML-specific attribute cache hits + keepCacheEnabled int64 // Number of times keep cache was enabled + writebackEnabled int64 // Number of times writeback was enabled + mlAttrCacheHits int64 // ML-specific attribute cache hits } // NewFUSEMLIntegration creates a new FUSE ML integration @@ -42,7 +42,7 @@ func NewFUSEMLIntegration(mlOpt *MLOptimization) *FUSEMLIntegration { enableWriteback: true, attrCacheTimeout: 5 * time.Second, entryCacheTimeout: 10 * time.Second, - + // ML-specific timeouts (longer for more stable caching) mlAttrTimeout: 30 * time.Second, datasetAttrTimeout: 60 * time.Second, @@ -54,17 +54,17 @@ func NewFUSEMLIntegration(mlOpt *MLOptimization) *FUSEMLIntegration { func (fmi *FUSEMLIntegration) OnFileOpen(inode uint64, entry *filer_pb.Entry, fullPath string, flags uint32, out *fuse.OpenOut) { // Register file in cache fileInfo := fmi.openFileCache.OpenFile(inode, entry, fullPath) - + // Apply ML-specific FUSE optimizations if fileInfo.IsMLFile && fmi.enableKeepCache { // Enable keep cache for ML files to reduce redundant reads out.OpenFlags |= fuse.FOPEN_KEEP_CACHE fmi.keepCacheEnabled++ - - glog.V(3).Infof("Enabled FOPEN_KEEP_CACHE for ML file: inode=%d, type=%v", + + glog.V(3).Infof("Enabled FOPEN_KEEP_CACHE for ML file: inode=%d, type=%v", inode, fileInfo.FileType) } - + // For large model files, also enable direct I/O to bypass page cache for very large reads if fileInfo.FileType == MLFileModel && entry.Attributes.FileSize > 100*1024*1024 { // > 100MB // Note: Direct I/O can be beneficial for very large sequential reads @@ -79,7 +79,7 @@ func (fmi *FUSEMLIntegration) OnFileOpen(inode uint64, entry *filer_pb.Entry, fu // OnFileClose handles file close events func (fmi *FUSEMLIntegration) OnFileClose(inode uint64) { canEvict := fmi.openFileCache.CloseFile(inode) - + if canEvict { glog.V(4).Infof("File closed and available for eviction: inode=%d", inode) } @@ -90,7 +90,7 @@ func (fmi *FUSEMLIntegration) OnFileRead(inode uint64, offset int64, size int) { // Update access pattern if fmi.mlOptimization != nil && fmi.mlOptimization.IsEnabled() { accessInfo := fmi.mlOptimization.RecordAccess(inode, offset, size) - + // Update file info with detected pattern if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil { fileInfo.Lock() @@ -100,10 +100,10 @@ func (fmi *FUSEMLIntegration) OnFileRead(inode uint64, offset int64, size int) { } fileInfo.TotalBytesRead += int64(size) fileInfo.Unlock() - + // Trigger prefetching if pattern detected if shouldPrefetch, _ := fmi.mlOptimization.ShouldPrefetch(inode); shouldPrefetch { - glog.V(4).Infof("Prefetch triggered for ML file: inode=%d, pattern=%v", + glog.V(4).Infof("Prefetch triggered for ML file: inode=%d, pattern=%v", inode, fileInfo.ReadPattern) } } @@ -118,10 +118,10 @@ func (fmi *FUSEMLIntegration) OptimizeAttributes(inode uint64, out *fuse.AttrOut out.AttrValid = uint64(fmi.attrCacheTimeout.Seconds()) return } - + // Apply ML-specific timeouts var timeout time.Duration - + switch fileInfo.FileType { case MLFileModel: // Model files rarely change, cache attributes longer @@ -136,15 +136,15 @@ func (fmi *FUSEMLIntegration) OptimizeAttributes(inode uint64, out *fuse.AttrOut // Use default timeout for non-ML files timeout = fmi.attrCacheTimeout } - + out.AttrValid = uint64(timeout.Seconds()) fmi.mlAttrCacheHits++ - - glog.V(4).Infof("ML attribute cache timeout: inode=%d, type=%v, timeout=%v", + + glog.V(4).Infof("ML attribute cache timeout: inode=%d, type=%v, timeout=%v", inode, fileInfo.FileType, timeout) } -// OptimizeEntryCache applies ML-specific entry caching optimizations +// OptimizeEntryCache applies ML-specific entry caching optimizations func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.Entry, out *fuse.EntryOut) { fileInfo := fmi.openFileCache.GetFileInfo(inode) if fileInfo == nil { @@ -152,10 +152,10 @@ func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.E out.SetEntryTimeout(fmi.entryCacheTimeout) return } - + // ML files can have longer entry cache timeouts since they change infrequently var timeout time.Duration - + switch fileInfo.FileType { case MLFileModel, MLFileDataset: // Models and datasets rarely change during training @@ -166,10 +166,10 @@ func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.E default: timeout = fmi.entryCacheTimeout } - + out.SetEntryTimeout(timeout) - - glog.V(4).Infof("ML entry cache timeout: inode=%d, type=%v, timeout=%v", + + glog.V(4).Infof("ML entry cache timeout: inode=%d, type=%v, timeout=%v", inode, fileInfo.FileType, timeout) } @@ -178,12 +178,12 @@ func (fmi *FUSEMLIntegration) ShouldEnableWriteback(inode uint64, entry *filer_p if !fmi.enableWriteback { return false } - + fileInfo := fmi.openFileCache.GetFileInfo(inode) if fileInfo == nil { return false } - + // Enable writeback for ML files that are frequently written switch fileInfo.FileType { case MLFileLog: @@ -204,7 +204,7 @@ func (fmi *FUSEMLIntegration) ShouldEnableWriteback(inode uint64, entry *filer_p // Default behavior for non-ML files return false } - + return false } @@ -213,15 +213,15 @@ func (fmi *FUSEMLIntegration) OnChunkAccess(inode uint64, chunkIndex uint32, fil metadata := &ChunkMetadata{ FileId: fileId, Offset: uint64(chunkIndex) * 1024, // Assuming 1KB chunks for now - Size: 1024, + Size: 1024, LastAccess: time.Now(), CacheLevel: cacheLevel, AccessCount: 1, // Will be incremented in UpdateChunkCache } - + // Update chunk cache fmi.openFileCache.UpdateChunkCache(inode, chunkIndex, metadata) - + // Update file-level statistics if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil { fileInfo.Lock() @@ -240,16 +240,16 @@ func (fmi *FUSEMLIntegration) GetOptimizationMetrics() FUSEMLMetrics { if fmi.mlOptimization != nil { mlMetrics = fmi.mlOptimization.GetMetrics() } - + return FUSEMLMetrics{ - MLOptimizationMetrics: mlMetrics, - OpenFileCacheMetrics: fmi.openFileCache.GetMetrics(), - CachePolicyMetrics: fmi.cachePolicy.GetEvictionMetrics(), - KeepCacheEnabled: fmi.keepCacheEnabled, - WritebackEnabled: fmi.writebackEnabled, - MLAttrCacheHits: fmi.mlAttrCacheHits, - EnableKeepCache: fmi.enableKeepCache, - EnableWriteback: fmi.enableWriteback, + MLOptimizationMetrics: mlMetrics, + OpenFileCacheMetrics: fmi.openFileCache.GetMetrics(), + CachePolicyMetrics: fmi.cachePolicy.GetEvictionMetrics(), + KeepCacheEnabled: fmi.keepCacheEnabled, + WritebackEnabled: fmi.writebackEnabled, + MLAttrCacheHits: fmi.mlAttrCacheHits, + EnableKeepCache: fmi.enableKeepCache, + EnableWriteback: fmi.enableWriteback, } } @@ -258,12 +258,12 @@ type FUSEMLMetrics struct { MLOptimizationMetrics *MLOptimizationMetrics `json:"ml_optimization,omitempty"` OpenFileCacheMetrics OpenFileCacheMetrics `json:"open_file_cache"` CachePolicyMetrics MLCachePolicyMetrics `json:"cache_policy"` - + // FUSE-specific metrics - KeepCacheEnabled int64 `json:"keep_cache_enabled"` - WritebackEnabled int64 `json:"writeback_enabled"` - MLAttrCacheHits int64 `json:"ml_attr_cache_hits"` - + KeepCacheEnabled int64 `json:"keep_cache_enabled"` + WritebackEnabled int64 `json:"writeback_enabled"` + MLAttrCacheHits int64 `json:"ml_attr_cache_hits"` + // Configuration EnableKeepCache bool `json:"enable_keep_cache"` EnableWriteback bool `json:"enable_writeback"` @@ -272,18 +272,18 @@ type FUSEMLMetrics struct { // Shutdown gracefully shuts down the FUSE ML integration func (fmi *FUSEMLIntegration) Shutdown() { glog.V(1).Infof("Shutting down FUSE ML integration...") - + if fmi.openFileCache != nil { fmi.openFileCache.Shutdown() } - + if fmi.mlOptimization != nil { fmi.mlOptimization.Shutdown() } - + // Print final metrics metrics := fmi.GetOptimizationMetrics() - glog.V(1).Infof("FUSE ML integration final metrics: keep_cache=%d, writeback=%d, attr_hits=%d", + glog.V(1).Infof("FUSE ML integration final metrics: keep_cache=%d, writeback=%d, attr_hits=%d", metrics.KeepCacheEnabled, metrics.WritebackEnabled, metrics.MLAttrCacheHits) } @@ -291,11 +291,11 @@ func (fmi *FUSEMLIntegration) Shutdown() { func (fmi *FUSEMLIntegration) EnableMLOptimizations(enabled bool) { fmi.enableKeepCache = enabled fmi.enableWriteback = enabled - + if fmi.mlOptimization != nil { fmi.mlOptimization.Enable(enabled) } - + glog.V(1).Infof("ML FUSE optimizations %s", map[bool]string{true: "enabled", false: "disabled"}[enabled]) } @@ -306,7 +306,7 @@ func (fmi *FUSEMLIntegration) SetCacheTimeouts(attr, entry, mlAttr, dataset, mod fmi.mlAttrTimeout = mlAttr fmi.datasetAttrTimeout = dataset fmi.modelAttrTimeout = model - - glog.V(2).Infof("Updated cache timeouts: attr=%v, entry=%v, ml=%v, dataset=%v, model=%v", + + glog.V(2).Infof("Updated cache timeouts: attr=%v, entry=%v, ml=%v, dataset=%v, model=%v", attr, entry, mlAttr, dataset, model) } diff --git a/weed/mount/ml/open_file_cache.go b/weed/mount/ml/open_file_cache.go index 8d23a0bd8..fe3e794a8 100644 --- a/weed/mount/ml/open_file_cache.go +++ b/weed/mount/ml/open_file_cache.go @@ -10,48 +10,48 @@ import ( // ChunkMetadata contains metadata about a cached chunk type ChunkMetadata struct { - FileId string // Chunk file ID - Offset uint64 // Offset within the file - Size uint64 // Size of the chunk - CacheLevel int // 0=memory, 1=disk, 2=not cached - LastAccess time.Time // Last access time - AccessCount int64 // Number of times accessed - IsHot bool // Whether this chunk is frequently accessed - Pattern AccessPattern // Access pattern for this chunk + FileId string // Chunk file ID + Offset uint64 // Offset within the file + Size uint64 // Size of the chunk + CacheLevel int // 0=memory, 1=disk, 2=not cached + LastAccess time.Time // Last access time + AccessCount int64 // Number of times accessed + IsHot bool // Whether this chunk is frequently accessed + Pattern AccessPattern // Access pattern for this chunk } // OpenFileInfo contains comprehensive information about an open file type OpenFileInfo struct { sync.RWMutex - + // Basic file information - Inode uint64 // File inode - Entry *filer_pb.Entry // File entry from filer - OpenCount int // Number of open handles - OpenTime time.Time // When file was first opened - LastAccess time.Time // Last access time - + Inode uint64 // File inode + Entry *filer_pb.Entry // File entry from filer + OpenCount int // Number of open handles + OpenTime time.Time // When file was first opened + LastAccess time.Time // Last access time + // Chunk-level caching - ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata - ChunkCount uint32 // Total number of chunks in file - ChunkSize int64 // Size of each chunk - + ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata + ChunkCount uint32 // Total number of chunks in file + ChunkSize int64 // Size of each chunk + // Access pattern tracking - AccessInfo *AccessInfo // Access pattern information - ReadPattern AccessPattern // Overall file access pattern - PrefetchState PrefetchState // Current prefetch state - + AccessInfo *AccessInfo // Access pattern information + ReadPattern AccessPattern // Overall file access pattern + PrefetchState PrefetchState // Current prefetch state + // ML-specific optimizations - IsMLFile bool // Whether this is likely an ML-related file - FileType MLFileType // Type of ML file (dataset, model, etc.) - BatchSize int // Detected batch size for training data - EpochCount int // Number of epochs detected - + IsMLFile bool // Whether this is likely an ML-related file + FileType MLFileType // Type of ML file (dataset, model, etc.) + BatchSize int // Detected batch size for training data + EpochCount int // Number of epochs detected + // Performance tracking - TotalBytesRead int64 // Total bytes read from this file - CacheHitCount int64 // Number of cache hits - CacheMissCount int64 // Number of cache misses - PrefetchHitCount int64 // Number of prefetch hits + TotalBytesRead int64 // Total bytes read from this file + CacheHitCount int64 // Number of cache hits + CacheMissCount int64 // Number of cache misses + PrefetchHitCount int64 // Number of prefetch hits } // PrefetchState represents the current prefetch state for a file @@ -69,36 +69,36 @@ type MLFileType int const ( MLFileUnknown MLFileType = iota - MLFileDataset // Training/validation dataset - MLFileModel // Model checkpoint/weights - MLFileConfig // Configuration files - MLFileTensor // Individual tensor files - MLFileLog // Training logs + MLFileDataset // Training/validation dataset + MLFileModel // Model checkpoint/weights + MLFileConfig // Configuration files + MLFileTensor // Individual tensor files + MLFileLog // Training logs ) // OpenFileCache manages open file information with ML-aware optimizations type OpenFileCache struct { sync.RWMutex - + // Configuration maxFiles int // Maximum number of files to track ttl time.Duration // TTL for inactive files cleanupInterval time.Duration // Cleanup interval - + // File tracking - files map[uint64]*OpenFileInfo // inode -> file info - accessOrder []uint64 // LRU order for eviction - + files map[uint64]*OpenFileInfo // inode -> file info + accessOrder []uint64 // LRU order for eviction + // ML-specific configuration enableMLOptimization bool - mlFileDetector *MLFileDetector - + mlFileDetector *MLFileDetector + // Metrics - totalFiles int64 - evictedFiles int64 - cacheHits int64 - cacheMisses int64 - + totalFiles int64 + evictedFiles int64 + cacheHits int64 + cacheMisses int64 + // Background cleanup shutdown chan struct{} done chan struct{} @@ -110,11 +110,11 @@ type MLFileDetector struct { datasetExtensions map[string]bool modelExtensions map[string]bool configExtensions map[string]bool - + // Path patterns datasetPaths []string modelPaths []string - + // Size heuristics modelMinSize int64 // Minimum size for model files datasetMaxItems int // Maximum items in dataset directory @@ -128,22 +128,22 @@ func NewOpenFileCache(maxFiles int, ttl time.Duration) *OpenFileCache { if ttl <= 0 { ttl = 30 * time.Minute // Default TTL } - + ofc := &OpenFileCache{ - maxFiles: maxFiles, - ttl: ttl, - cleanupInterval: 5 * time.Minute, - files: make(map[uint64]*OpenFileInfo), - accessOrder: make([]uint64, 0, maxFiles), + maxFiles: maxFiles, + ttl: ttl, + cleanupInterval: 5 * time.Minute, + files: make(map[uint64]*OpenFileInfo), + accessOrder: make([]uint64, 0, maxFiles), enableMLOptimization: true, - mlFileDetector: newMLFileDetector(), - shutdown: make(chan struct{}), - done: make(chan struct{}), + mlFileDetector: newMLFileDetector(), + shutdown: make(chan struct{}), + done: make(chan struct{}), } - + // Start background cleanup go ofc.cleanupWorker() - + glog.V(1).Infof("OpenFileCache initialized: maxFiles=%d, ttl=%v", maxFiles, ttl) return ofc } @@ -185,7 +185,7 @@ func newMLFileDetector() *MLFileDetector { func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath string) *OpenFileInfo { ofc.Lock() defer ofc.Unlock() - + // Get or create file info fileInfo := ofc.files[inode] if fileInfo == nil { @@ -198,35 +198,35 @@ func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath ReadPattern: RandomAccess, PrefetchState: PrefetchIdle, } - + // Detect ML file type if ofc.enableMLOptimization { fileInfo.IsMLFile, fileInfo.FileType = ofc.mlFileDetector.DetectMLFile(entry, fullPath) if fileInfo.IsMLFile { - glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s", + glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s", inode, fileInfo.FileType, fullPath) } } - + ofc.files[inode] = fileInfo ofc.totalFiles++ - + // Update access order for LRU ofc.updateAccessOrder(inode) - + // Evict if necessary if len(ofc.files) > ofc.maxFiles { ofc.evictLRU() } } - + fileInfo.OpenCount++ fileInfo.LastAccess = time.Now() ofc.updateAccessOrder(inode) - - glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v", + + glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v", inode, fileInfo.OpenCount, fileInfo.IsMLFile) - + return fileInfo } @@ -234,15 +234,15 @@ func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath func (ofc *OpenFileCache) CloseFile(inode uint64) bool { ofc.Lock() defer ofc.Unlock() - + fileInfo := ofc.files[inode] if fileInfo == nil { return true // Already cleaned up } - + fileInfo.OpenCount-- glog.V(4).Infof("File closed: inode=%d, openCount=%d", inode, fileInfo.OpenCount) - + // Return true if file can be evicted (no more open handles) return fileInfo.OpenCount <= 0 } @@ -251,14 +251,14 @@ func (ofc *OpenFileCache) CloseFile(inode uint64) bool { func (ofc *OpenFileCache) GetFileInfo(inode uint64) *OpenFileInfo { ofc.RLock() defer ofc.RUnlock() - + fileInfo := ofc.files[inode] if fileInfo != nil { fileInfo.LastAccess = time.Now() ofc.cacheHits++ return fileInfo } - + ofc.cacheMisses++ return nil } @@ -268,19 +268,19 @@ func (ofc *OpenFileCache) UpdateChunkCache(inode uint64, chunkIndex uint32, meta ofc.RLock() fileInfo := ofc.files[inode] ofc.RUnlock() - + if fileInfo == nil { return } - + fileInfo.Lock() defer fileInfo.Unlock() - + fileInfo.ChunkCache[chunkIndex] = metadata metadata.LastAccess = time.Now() metadata.AccessCount++ - - glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d", + + glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d", inode, chunkIndex, metadata.CacheLevel) } @@ -289,20 +289,20 @@ func (ofc *OpenFileCache) GetChunkMetadata(inode uint64, chunkIndex uint32) (*Ch ofc.RLock() fileInfo := ofc.files[inode] ofc.RUnlock() - + if fileInfo == nil { return nil, false } - + fileInfo.RLock() defer fileInfo.RUnlock() - + metadata, exists := fileInfo.ChunkCache[chunkIndex] if exists { metadata.LastAccess = time.Now() metadata.AccessCount++ } - + return metadata, exists } @@ -315,7 +315,7 @@ func (ofc *OpenFileCache) updateAccessOrder(inode uint64) { break } } - + // Add to front (most recently used) ofc.accessOrder = append([]uint64{inode}, ofc.accessOrder...) } @@ -325,24 +325,24 @@ func (ofc *OpenFileCache) evictLRU() { if len(ofc.accessOrder) == 0 { return } - + // Find LRU file that can be evicted (not currently open) for i := len(ofc.accessOrder) - 1; i >= 0; i-- { inode := ofc.accessOrder[i] fileInfo := ofc.files[inode] - + if fileInfo != nil && fileInfo.OpenCount <= 0 { // Evict this file delete(ofc.files, inode) ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) ofc.evictedFiles++ - - glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d", + + glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d", inode, len(fileInfo.ChunkCache)) return } } - + // If no files can be evicted, just log a warning glog.V(2).Infof("Warning: Could not evict any files from cache (all files are open)") } @@ -351,7 +351,7 @@ func (ofc *OpenFileCache) evictLRU() { func (ofc *OpenFileCache) cleanupWorker() { ticker := time.NewTicker(ofc.cleanupInterval) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -367,17 +367,17 @@ func (ofc *OpenFileCache) cleanupWorker() { func (ofc *OpenFileCache) cleanup() { ofc.Lock() defer ofc.Unlock() - + now := time.Now() toRemove := make([]uint64, 0) - + for inode, fileInfo := range ofc.files { // Only cleanup files that are not open and have expired if fileInfo.OpenCount <= 0 && now.Sub(fileInfo.LastAccess) > ofc.ttl { toRemove = append(toRemove, inode) } } - + // Remove expired files for _, inode := range toRemove { delete(ofc.files, inode) @@ -389,7 +389,7 @@ func (ofc *OpenFileCache) cleanup() { } } } - + if len(toRemove) > 0 { glog.V(3).Infof("Cleaned up %d expired file cache entries", len(toRemove)) } @@ -399,12 +399,12 @@ func (ofc *OpenFileCache) cleanup() { func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics { ofc.RLock() defer ofc.RUnlock() - + var totalChunks int64 var mlFiles int64 fileTypes := make(map[MLFileType]int) patterns := make(map[AccessPattern]int) - + for _, fileInfo := range ofc.files { totalChunks += int64(len(fileInfo.ChunkCache)) if fileInfo.IsMLFile { @@ -413,43 +413,43 @@ func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics { } patterns[fileInfo.ReadPattern]++ } - + return OpenFileCacheMetrics{ - TotalFiles: int64(len(ofc.files)), - MLFiles: mlFiles, - TotalChunks: totalChunks, - CacheHits: ofc.cacheHits, - CacheMisses: ofc.cacheMisses, - EvictedFiles: ofc.evictedFiles, - FileTypes: fileTypes, + TotalFiles: int64(len(ofc.files)), + MLFiles: mlFiles, + TotalChunks: totalChunks, + CacheHits: ofc.cacheHits, + CacheMisses: ofc.cacheMisses, + EvictedFiles: ofc.evictedFiles, + FileTypes: fileTypes, AccessPatterns: patterns, } } // OpenFileCacheMetrics holds metrics for the open file cache type OpenFileCacheMetrics struct { - TotalFiles int64 `json:"total_files"` - MLFiles int64 `json:"ml_files"` - TotalChunks int64 `json:"total_chunks"` - CacheHits int64 `json:"cache_hits"` - CacheMisses int64 `json:"cache_misses"` - EvictedFiles int64 `json:"evicted_files"` - FileTypes map[MLFileType]int `json:"file_types"` - AccessPatterns map[AccessPattern]int `json:"access_patterns"` + TotalFiles int64 `json:"total_files"` + MLFiles int64 `json:"ml_files"` + TotalChunks int64 `json:"total_chunks"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + EvictedFiles int64 `json:"evicted_files"` + FileTypes map[MLFileType]int `json:"file_types"` + AccessPatterns map[AccessPattern]int `json:"access_patterns"` } // Shutdown gracefully shuts down the open file cache func (ofc *OpenFileCache) Shutdown() { glog.V(1).Infof("Shutting down OpenFileCache...") - + close(ofc.shutdown) - + // Wait for cleanup worker to finish <-ofc.done - + // Print final metrics metrics := ofc.GetMetrics() - glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d", + glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d", metrics.TotalFiles, metrics.TotalChunks, metrics.CacheHits, metrics.CacheMisses) } @@ -460,10 +460,10 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str if entry == nil { return false, MLFileUnknown } - + name := entry.Name size := int64(entry.Attributes.FileSize) - + // Check file extension if ext := getFileExtension(name); ext != "" { if detector.datasetExtensions[ext] { @@ -476,20 +476,20 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str return true, MLFileConfig } } - + // Check path patterns for _, path := range detector.datasetPaths { if contains(fullPath, path) { return true, MLFileDataset } } - + for _, path := range detector.modelPaths { if contains(fullPath, path) { return true, MLFileModel } } - + // Check size heuristics if size > detector.modelMinSize { // Large files in certain contexts might be models @@ -497,17 +497,17 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str return true, MLFileModel } } - + // Check for tensor files if contains(name, "tensor") || contains(name, ".pt") || contains(name, ".npy") { return true, MLFileTensor } - + // Check for log files if contains(name, "log") || contains(name, "tensorboard") || contains(fullPath, "logs") { return true, MLFileLog } - + return false, MLFileUnknown } @@ -533,7 +533,7 @@ func findSubstring(str, substr string) bool { if len(str) < len(substr) { return false } - + for i := 0; i <= len(str)-len(substr); i++ { if str[i:i+len(substr)] == substr { return true diff --git a/weed/mount/ml/open_file_cache_test.go b/weed/mount/ml/open_file_cache_test.go index d7d3e9664..5353cefcd 100644 --- a/weed/mount/ml/open_file_cache_test.go +++ b/weed/mount/ml/open_file_cache_test.go @@ -13,24 +13,24 @@ func TestOpenFileCache_Basic(t *testing.T) { // Test opening a file entry := &filer_pb.Entry{ - Name: "test.txt", + Name: "test.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } - + inode := uint64(1) fullPath := "/test/test.txt" fileInfo := cache.OpenFile(inode, entry, fullPath) - + if fileInfo == nil { t.Fatal("OpenFile should return file info") } - + if fileInfo.Inode != inode { t.Errorf("Expected inode %d, got %d", inode, fileInfo.Inode) } - + if fileInfo.OpenCount != 1 { t.Errorf("Expected open count 1, got %d", fileInfo.OpenCount) } @@ -47,26 +47,26 @@ func TestOpenFileCache_MLFileDetection(t *testing.T) { size uint64 expected MLFileType }{ - {"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100*1024*1024, MLFileModel}, - {"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2*1024*1024, MLFileDataset}, + {"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100 * 1024 * 1024, MLFileModel}, + {"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2 * 1024 * 1024, MLFileDataset}, {"Config file", "/config/training.yaml", "training.yaml", 1024, MLFileConfig}, - {"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50*1024*1024, MLFileModel}, - {"Log file", "/logs/training.log", "training.log", 10*1024, MLFileLog}, - {"Regular file", "/documents/readme.txt", "readme.txt", 5*1024, MLFileUnknown}, + {"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50 * 1024 * 1024, MLFileModel}, + {"Log file", "/logs/training.log", "training.log", 10 * 1024, MLFileLog}, + {"Regular file", "/documents/readme.txt", "readme.txt", 5 * 1024, MLFileUnknown}, } - + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { entry := &filer_pb.Entry{ - Name: tc.filename, + Name: tc.filename, Attributes: &filer_pb.FuseAttributes{ FileSize: tc.size, }, } - + inode := uint64(time.Now().UnixNano()) // Unique inode fileInfo := cache.OpenFile(inode, entry, tc.path) - + if tc.expected == MLFileUnknown { if fileInfo.IsMLFile { t.Errorf("File %s should not be detected as ML file", tc.path) @@ -75,7 +75,7 @@ func TestOpenFileCache_MLFileDetection(t *testing.T) { if !fileInfo.IsMLFile { t.Errorf("File %s should be detected as ML file", tc.path) } - + if fileInfo.FileType != tc.expected { t.Errorf("Expected file type %v, got %v", tc.expected, fileInfo.FileType) } @@ -90,15 +90,15 @@ func TestOpenFileCache_ChunkMetadata(t *testing.T) { inode := uint64(1) entry := &filer_pb.Entry{ - Name: "data.bin", + Name: "data.bin", Attributes: &filer_pb.FuseAttributes{ FileSize: 10240, }, } fullPath := "/data/data.bin" - + cache.OpenFile(inode, entry, fullPath) - + // Test updating chunk metadata chunkIndex := uint32(0) metadata := &ChunkMetadata{ @@ -110,19 +110,19 @@ func TestOpenFileCache_ChunkMetadata(t *testing.T) { AccessCount: 1, Pattern: SequentialAccess, } - + cache.UpdateChunkCache(inode, chunkIndex, metadata) - + // Test retrieving chunk metadata retrieved, exists := cache.GetChunkMetadata(inode, chunkIndex) if !exists { t.Error("Chunk metadata should exist") } - + if retrieved.FileId != metadata.FileId { t.Errorf("Expected FileId %s, got %s", metadata.FileId, retrieved.FileId) } - + if retrieved.AccessCount != 2 { // Should be incremented during retrieval t.Errorf("Expected access count 2, got %d", retrieved.AccessCount) } @@ -135,7 +135,7 @@ func TestOpenFileCache_LRUEviction(t *testing.T) { // Fill cache to capacity for i := 1; i <= 3; i++ { entry := &filer_pb.Entry{ - Name: "file" + string(rune('0'+i)) + ".txt", + Name: "file" + string(rune('0'+i)) + ".txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, @@ -144,27 +144,27 @@ func TestOpenFileCache_LRUEviction(t *testing.T) { cache.OpenFile(uint64(i), entry, fullPath) cache.CloseFile(uint64(i)) // Close immediately so they can be evicted } - + // Add one more file - should trigger eviction entry4 := &filer_pb.Entry{ - Name: "file4.txt", + Name: "file4.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } cache.OpenFile(uint64(4), entry4, "/test/file4.txt") - + metrics := cache.GetMetrics() if metrics.EvictedFiles == 0 { t.Error("Should have evicted at least one file") } - + // File 1 should be evicted (oldest) file1Info := cache.GetFileInfo(uint64(1)) if file1Info != nil { t.Error("File 1 should have been evicted") } - + // File 4 should still be there file4Info := cache.GetFileInfo(uint64(4)) if file4Info == nil { @@ -178,27 +178,27 @@ func TestOpenFileCache_TTLCleanup(t *testing.T) { inode := uint64(1) entry := &filer_pb.Entry{ - Name: "test.txt", + Name: "test.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } - + fileInfo := cache.OpenFile(inode, entry, "/test/test.txt") cache.CloseFile(inode) // Close so it can be cleaned up - + // Wait for TTL to expire time.Sleep(150 * time.Millisecond) - + // Trigger cleanup manually cache.cleanup() - + // File should be cleaned up retrievedInfo := cache.GetFileInfo(inode) if retrievedInfo != nil { t.Error("File should have been cleaned up after TTL expiration") } - + _ = fileInfo // Avoid unused variable warning } @@ -208,35 +208,35 @@ func TestOpenFileCache_MultipleOpens(t *testing.T) { inode := uint64(1) entry := &filer_pb.Entry{ - Name: "shared.txt", + Name: "shared.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } fullPath := "/test/shared.txt" - + // Open file multiple times fileInfo1 := cache.OpenFile(inode, entry, fullPath) fileInfo2 := cache.OpenFile(inode, entry, fullPath) - + if fileInfo1 != fileInfo2 { t.Error("Multiple opens of same file should return same file info") } - + if fileInfo1.OpenCount != 2 { t.Errorf("Expected open count 2, got %d", fileInfo1.OpenCount) } - + // Close once canEvict1 := cache.CloseFile(inode) if canEvict1 { t.Error("Should not be able to evict file with open count > 0") } - + if fileInfo1.OpenCount != 1 { t.Errorf("Expected open count 1 after first close, got %d", fileInfo1.OpenCount) } - + // Close again canEvict2 := cache.CloseFile(inode) if !canEvict2 { @@ -260,16 +260,16 @@ func TestOpenFileCache_Metrics(t *testing.T) { {3, "config.yaml", "/config/config.yaml", 1024}, {4, "regular.txt", "/docs/regular.txt", 5 * 1024}, } - + for _, file := range files { entry := &filer_pb.Entry{ - Name: file.filename, + Name: file.filename, Attributes: &filer_pb.FuseAttributes{ FileSize: file.size, }, } cache.OpenFile(file.inode, entry, file.path) - + // Add some chunk metadata metadata := &ChunkMetadata{ FileId: "chunk_" + string(rune(file.inode)), @@ -279,26 +279,26 @@ func TestOpenFileCache_Metrics(t *testing.T) { } cache.UpdateChunkCache(file.inode, 0, metadata) } - + metrics := cache.GetMetrics() - + if metrics.TotalFiles != 4 { t.Errorf("Expected 4 total files, got %d", metrics.TotalFiles) } - + if metrics.MLFiles < 2 { // Should detect at least model and dataset t.Errorf("Expected at least 2 ML files, got %d", metrics.MLFiles) } - + if metrics.TotalChunks != 4 { t.Errorf("Expected 4 total chunks, got %d", metrics.TotalChunks) } - + // Check file type counts if metrics.FileTypes[MLFileModel] == 0 { t.Error("Should detect at least one model file") } - + if metrics.FileTypes[MLFileDataset] == 0 { t.Error("Should detect at least one dataset file") } @@ -311,24 +311,24 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) { // Test concurrent access to the cache numGoroutines := 10 done := make(chan bool, numGoroutines) - + for i := 0; i < numGoroutines; i++ { go func(id int) { defer func() { done <- true }() - + inode := uint64(id) entry := &filer_pb.Entry{ - Name: "file" + string(rune('0'+id)) + ".txt", + Name: "file" + string(rune('0'+id)) + ".txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } fullPath := "/test/file" + string(rune('0'+id)) + ".txt" - + // Perform multiple operations for j := 0; j < 10; j++ { cache.OpenFile(inode, entry, fullPath) - + metadata := &ChunkMetadata{ FileId: "chunk_" + string(rune(id)) + "_" + string(rune(j)), Offset: uint64(j * 1024), @@ -336,18 +336,18 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) { CacheLevel: 0, } cache.UpdateChunkCache(inode, uint32(j), metadata) - + cache.GetChunkMetadata(inode, uint32(j)) cache.CloseFile(inode) } }(i) } - + // Wait for all goroutines to complete for i := 0; i < numGoroutines; i++ { <-done } - + // Verify cache state metrics := cache.GetMetrics() if metrics.TotalFiles == 0 { @@ -357,7 +357,7 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) { func TestMLFileDetector_Extensions(t *testing.T) { detector := newMLFileDetector() - + testCases := []struct { filename string path string @@ -369,20 +369,20 @@ func TestMLFileDetector_Extensions(t *testing.T) { {"config.yaml", "/config/config.yaml", MLFileConfig}, {"tensor.safetensors", "/tensors/tensor.safetensors", MLFileModel}, {"training.log", "/logs/training.log", MLFileLog}, - {"document.txt", "/docs/document.txt", MLFileUnknown}, + {"document.txt", "/docs/document.txt", MLFileUnknown}, } - + for _, tc := range testCases { t.Run(tc.filename, func(t *testing.T) { entry := &filer_pb.Entry{ - Name: tc.filename, + Name: tc.filename, Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } - + isML, fileType := detector.DetectMLFile(entry, tc.path) - + if tc.expected == MLFileUnknown { // For unknown files, either ML detection result is acceptable t.Logf("File %s: isML=%v, type=%v", tc.filename, isML, fileType) @@ -390,7 +390,7 @@ func TestMLFileDetector_Extensions(t *testing.T) { if !isML { t.Errorf("File %s should be detected as ML file", tc.filename) } - + if fileType != tc.expected { t.Errorf("File %s: expected type %v, got %v", tc.filename, tc.expected, fileType) } @@ -401,7 +401,7 @@ func TestMLFileDetector_Extensions(t *testing.T) { func TestMLFileDetector_PathPatterns(t *testing.T) { detector := newMLFileDetector() - + testCases := []struct { path string filename string @@ -413,25 +413,25 @@ func TestMLFileDetector_PathPatterns(t *testing.T) { {"/checkpoints/model_v1.bin", "model_v1.bin", MLFileModel}, {"/documents/report.pdf", "report.pdf", MLFileUnknown}, } - + for _, tc := range testCases { t.Run(tc.path, func(t *testing.T) { entry := &filer_pb.Entry{ - Name: tc.filename, + Name: tc.filename, Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } - + isML, fileType := detector.DetectMLFile(entry, tc.path) - + if tc.expected == MLFileUnknown { t.Logf("Path %s: isML=%v, type=%v", tc.path, isML, fileType) } else { if !isML { t.Errorf("Path %s should be detected as ML file", tc.path) } - + if fileType != tc.expected { t.Errorf("Path %s: expected type %v, got %v", tc.path, tc.expected, fileType) } @@ -442,21 +442,21 @@ func TestMLFileDetector_PathPatterns(t *testing.T) { func TestMLFileDetector_SizeHeuristics(t *testing.T) { detector := newMLFileDetector() - + // Large file with model-related name should be detected as model largeModelEntry := &filer_pb.Entry{ - Name: "large_model.bin", + Name: "large_model.bin", Attributes: &filer_pb.FuseAttributes{ FileSize: 500 * 1024 * 1024, // 500MB }, } - + isML, fileType := detector.DetectMLFile(largeModelEntry, "/checkpoints/large_model.bin") - + if !isML { t.Error("Large model file should be detected as ML file") } - + if fileType != MLFileModel { t.Errorf("Large model file should be detected as model, got %v", fileType) } @@ -469,7 +469,7 @@ func TestOpenFileCache_EvictionProtection(t *testing.T) { // Open two files and keep them open for i := 1; i <= 2; i++ { entry := &filer_pb.Entry{ - Name: "file" + string(rune('0'+i)) + ".txt", + Name: "file" + string(rune('0'+i)) + ".txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, @@ -478,16 +478,16 @@ func TestOpenFileCache_EvictionProtection(t *testing.T) { cache.OpenFile(uint64(i), entry, fullPath) // Don't close - keep them open } - + // Try to open a third file - should not evict open files entry3 := &filer_pb.Entry{ - Name: "file3.txt", + Name: "file3.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } cache.OpenFile(uint64(3), entry3, "/test/file3.txt") - + // All files should still be there since none could be evicted for i := 1; i <= 3; i++ { fileInfo := cache.GetFileInfo(uint64(i)) @@ -502,38 +502,38 @@ func TestOpenFileCache_GetFileInfo_CacheHitMiss(t *testing.T) { defer cache.Shutdown() inode := uint64(1) - + // Test cache miss fileInfo := cache.GetFileInfo(inode) if fileInfo != nil { t.Error("Should return nil for non-existent file") } - + initialMetrics := cache.GetMetrics() if initialMetrics.CacheMisses == 0 { t.Error("Should record cache miss") } - + // Add file to cache entry := &filer_pb.Entry{ - Name: "test.txt", + Name: "test.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } cache.OpenFile(inode, entry, "/test/test.txt") - + // Test cache hit fileInfo = cache.GetFileInfo(inode) if fileInfo == nil { t.Error("Should return file info for existing file") } - + finalMetrics := cache.GetMetrics() if finalMetrics.CacheHits == 0 { t.Error("Should record cache hit") } - + if finalMetrics.CacheHits <= initialMetrics.CacheHits { t.Error("Cache hits should increase") } @@ -545,7 +545,7 @@ func TestOpenFileCache_Shutdown(t *testing.T) { // Add some files for i := 1; i <= 3; i++ { entry := &filer_pb.Entry{ - Name: "file" + string(rune('0'+i)) + ".txt", + Name: "file" + string(rune('0'+i)) + ".txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, @@ -576,7 +576,7 @@ func BenchmarkOpenFileCache_OpenFile(b *testing.B) { defer cache.Shutdown() entry := &filer_pb.Entry{ - Name: "benchmark.txt", + Name: "benchmark.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, @@ -597,13 +597,13 @@ func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) { // Pre-populate cache entry := &filer_pb.Entry{ - Name: "benchmark.txt", + Name: "benchmark.txt", Attributes: &filer_pb.FuseAttributes{ FileSize: 1024, }, } fullPath := "/test/benchmark.txt" - + for i := 0; i < 100; i++ { cache.OpenFile(uint64(i), entry, fullPath) } @@ -614,4 +614,4 @@ func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) { inode := uint64(i % 100) cache.GetFileInfo(inode) } -} \ No newline at end of file +} diff --git a/weed/mount/ml_integration.go b/weed/mount/ml_integration.go index c79882c82..fefd23cd6 100644 --- a/weed/mount/ml_integration.go +++ b/weed/mount/ml_integration.go @@ -1,8 +1,6 @@ package mount import ( - "time" - "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/ml" @@ -13,9 +11,9 @@ import ( // MLIntegrationManager manages ML optimization integration for the main WFS type MLIntegrationManager struct { - mlOptimization *ml.MLOptimization + mlOptimization *ml.MLOptimization fuseIntegration *ml.FUSEMLIntegration - enabled bool + enabled bool } // NewMLIntegrationManager creates a new ML integration manager @@ -23,16 +21,16 @@ func NewMLIntegrationManager(chunkCache chunk_cache.ChunkCache, lookupFn wdclien // Create ML optimization with default config config := ml.DefaultMLConfig() mlOpt := ml.NewMLOptimization(config, chunkCache, lookupFn) - + // Create FUSE integration fuseInt := ml.NewFUSEMLIntegration(mlOpt) - + manager := &MLIntegrationManager{ mlOptimization: mlOpt, fuseIntegration: fuseInt, enabled: true, } - + glog.V(1).Infof("ML integration manager initialized") return manager } @@ -40,15 +38,15 @@ func NewMLIntegrationManager(chunkCache chunk_cache.ChunkCache, lookupFn wdclien // EnableMLOptimization enables or disables ML optimization func (mgr *MLIntegrationManager) EnableMLOptimization(enabled bool) { mgr.enabled = enabled - + if mgr.mlOptimization != nil { mgr.mlOptimization.Enable(enabled) } - + if mgr.fuseIntegration != nil { mgr.fuseIntegration.EnableMLOptimizations(enabled) } - + glog.V(1).Infof("ML optimization %s", map[bool]string{true: "enabled", false: "disabled"}[enabled]) } @@ -57,7 +55,7 @@ func (mgr *MLIntegrationManager) OnFileOpen(inode uint64, entry *filer_pb.Entry, if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OnFileOpen(inode, entry, fullPath, flags, out) } @@ -66,7 +64,7 @@ func (mgr *MLIntegrationManager) OnFileClose(inode uint64) { if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OnFileClose(inode) } @@ -75,7 +73,7 @@ func (mgr *MLIntegrationManager) OnFileRead(inode uint64, offset int64, size int if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OnFileRead(inode, offset, size) } @@ -84,7 +82,7 @@ func (mgr *MLIntegrationManager) OnChunkAccess(inode uint64, chunkIndex uint32, if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OnChunkAccess(inode, chunkIndex, fileId, cacheLevel, isHit) } @@ -93,7 +91,7 @@ func (mgr *MLIntegrationManager) OptimizeAttributes(inode uint64, out *fuse.Attr if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OptimizeAttributes(inode, out) } @@ -102,7 +100,7 @@ func (mgr *MLIntegrationManager) OptimizeEntryCache(inode uint64, entry *filer_p if !mgr.enabled || mgr.fuseIntegration == nil { return } - + mgr.fuseIntegration.OptimizeEntryCache(inode, entry, out) } @@ -111,7 +109,7 @@ func (mgr *MLIntegrationManager) ShouldEnableWriteback(inode uint64, entry *file if !mgr.enabled || mgr.fuseIntegration == nil { return false } - + return mgr.fuseIntegration.ShouldEnableWriteback(inode, entry) } @@ -120,7 +118,7 @@ func (mgr *MLIntegrationManager) GetComprehensiveMetrics() *ml.FUSEMLMetrics { if !mgr.enabled || mgr.fuseIntegration == nil { return &ml.FUSEMLMetrics{} } - + metrics := mgr.fuseIntegration.GetOptimizationMetrics() return &metrics } @@ -133,10 +131,10 @@ func (mgr *MLIntegrationManager) IsEnabled() bool { // Shutdown gracefully shuts down the ML integration func (mgr *MLIntegrationManager) Shutdown() { glog.V(1).Infof("Shutting down ML integration manager...") - + if mgr.fuseIntegration != nil { mgr.fuseIntegration.Shutdown() } - + glog.V(1).Infof("ML integration manager shutdown complete") }