You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							229 lines
						
					
					
						
							6.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							229 lines
						
					
					
						
							6.9 KiB
						
					
					
				| package erasure_coding | |
| 
 | |
| import ( | |
| 	"sync" | |
| 	"time" | |
| ) | |
| 
 | |
| // ErasureCodingMetrics contains erasure coding-specific monitoring data | |
| type ErasureCodingMetrics struct { | |
| 	// Execution metrics | |
| 	VolumesEncoded      int64     `json:"volumes_encoded"` | |
| 	TotalShardsCreated  int64     `json:"total_shards_created"` | |
| 	TotalDataProcessed  int64     `json:"total_data_processed"` | |
| 	TotalSourcesRemoved int64     `json:"total_sources_removed"` | |
| 	LastEncodingTime    time.Time `json:"last_encoding_time"` | |
| 
 | |
| 	// Performance metrics | |
| 	AverageEncodingTime  int64 `json:"average_encoding_time_seconds"` | |
| 	AverageShardSize     int64 `json:"average_shard_size"` | |
| 	AverageDataShards    int   `json:"average_data_shards"` | |
| 	AverageParityShards  int   `json:"average_parity_shards"` | |
| 	SuccessfulOperations int64 `json:"successful_operations"` | |
| 	FailedOperations     int64 `json:"failed_operations"` | |
| 
 | |
| 	// Distribution metrics | |
| 	ShardsPerDataCenter  map[string]int64 `json:"shards_per_datacenter"` | |
| 	ShardsPerRack        map[string]int64 `json:"shards_per_rack"` | |
| 	PlacementSuccessRate float64          `json:"placement_success_rate"` | |
| 
 | |
| 	// Current task metrics | |
| 	CurrentVolumeSize      int64 `json:"current_volume_size"` | |
| 	CurrentShardCount      int   `json:"current_shard_count"` | |
| 	VolumesPendingEncoding int   `json:"volumes_pending_encoding"` | |
| 
 | |
| 	mutex sync.RWMutex | |
| } | |
| 
 | |
| // NewErasureCodingMetrics creates a new erasure coding metrics instance | |
| func NewErasureCodingMetrics() *ErasureCodingMetrics { | |
| 	return &ErasureCodingMetrics{ | |
| 		LastEncodingTime:    time.Now(), | |
| 		ShardsPerDataCenter: make(map[string]int64), | |
| 		ShardsPerRack:       make(map[string]int64), | |
| 	} | |
| } | |
| 
 | |
| // RecordVolumeEncoded records a successful volume encoding operation | |
| func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.VolumesEncoded++ | |
| 	m.TotalShardsCreated += int64(shardsCreated) | |
| 	m.TotalDataProcessed += volumeSize | |
| 	m.SuccessfulOperations++ | |
| 	m.LastEncodingTime = time.Now() | |
| 
 | |
| 	if sourceRemoved { | |
| 		m.TotalSourcesRemoved++ | |
| 	} | |
| 
 | |
| 	// Update average encoding time | |
| 	if m.AverageEncodingTime == 0 { | |
| 		m.AverageEncodingTime = int64(encodingTime.Seconds()) | |
| 	} else { | |
| 		// Exponential moving average | |
| 		newTime := int64(encodingTime.Seconds()) | |
| 		m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5 | |
| 	} | |
| 
 | |
| 	// Update average shard size | |
| 	if shardsCreated > 0 { | |
| 		avgShardSize := volumeSize / int64(shardsCreated) | |
| 		if m.AverageShardSize == 0 { | |
| 			m.AverageShardSize = avgShardSize | |
| 		} else { | |
| 			m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5 | |
| 		} | |
| 	} | |
| 
 | |
| 	// Update average data/parity shards | |
| 	if m.AverageDataShards == 0 { | |
| 		m.AverageDataShards = dataShards | |
| 		m.AverageParityShards = parityShards | |
| 	} else { | |
| 		m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5 | |
| 		m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5 | |
| 	} | |
| } | |
| 
 | |
| // RecordFailure records a failed erasure coding operation | |
| func (m *ErasureCodingMetrics) RecordFailure() { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.FailedOperations++ | |
| } | |
| 
 | |
| // RecordShardPlacement records shard placement for distribution tracking | |
| func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.ShardsPerDataCenter[dataCenter]++ | |
| 	rackKey := dataCenter + ":" + rack | |
| 	m.ShardsPerRack[rackKey]++ | |
| } | |
| 
 | |
| // UpdateCurrentVolumeInfo updates current volume processing information | |
| func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.CurrentVolumeSize = volumeSize | |
| 	m.CurrentShardCount = shardCount | |
| } | |
| 
 | |
| // SetVolumesPendingEncoding sets the number of volumes pending encoding | |
| func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.VolumesPendingEncoding = count | |
| } | |
| 
 | |
| // UpdatePlacementSuccessRate updates the placement success rate | |
| func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	if m.PlacementSuccessRate == 0 { | |
| 		m.PlacementSuccessRate = rate | |
| 	} else { | |
| 		// Exponential moving average | |
| 		m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate | |
| 	} | |
| } | |
| 
 | |
| // GetMetrics returns a copy of the current metrics (without the mutex) | |
| func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	// Create deep copy of maps | |
| 	shardsPerDC := make(map[string]int64) | |
| 	for k, v := range m.ShardsPerDataCenter { | |
| 		shardsPerDC[k] = v | |
| 	} | |
| 
 | |
| 	shardsPerRack := make(map[string]int64) | |
| 	for k, v := range m.ShardsPerRack { | |
| 		shardsPerRack[k] = v | |
| 	} | |
| 
 | |
| 	// Create a copy without the mutex to avoid copying lock value | |
| 	return ErasureCodingMetrics{ | |
| 		VolumesEncoded:         m.VolumesEncoded, | |
| 		TotalShardsCreated:     m.TotalShardsCreated, | |
| 		TotalDataProcessed:     m.TotalDataProcessed, | |
| 		TotalSourcesRemoved:    m.TotalSourcesRemoved, | |
| 		LastEncodingTime:       m.LastEncodingTime, | |
| 		AverageEncodingTime:    m.AverageEncodingTime, | |
| 		AverageShardSize:       m.AverageShardSize, | |
| 		AverageDataShards:      m.AverageDataShards, | |
| 		AverageParityShards:    m.AverageParityShards, | |
| 		SuccessfulOperations:   m.SuccessfulOperations, | |
| 		FailedOperations:       m.FailedOperations, | |
| 		ShardsPerDataCenter:    shardsPerDC, | |
| 		ShardsPerRack:          shardsPerRack, | |
| 		PlacementSuccessRate:   m.PlacementSuccessRate, | |
| 		CurrentVolumeSize:      m.CurrentVolumeSize, | |
| 		CurrentShardCount:      m.CurrentShardCount, | |
| 		VolumesPendingEncoding: m.VolumesPendingEncoding, | |
| 	} | |
| } | |
| 
 | |
| // GetSuccessRate returns the success rate as a percentage | |
| func (m *ErasureCodingMetrics) GetSuccessRate() float64 { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	total := m.SuccessfulOperations + m.FailedOperations | |
| 	if total == 0 { | |
| 		return 100.0 | |
| 	} | |
| 	return float64(m.SuccessfulOperations) / float64(total) * 100.0 | |
| } | |
| 
 | |
| // GetAverageDataProcessed returns the average data processed per volume | |
| func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	if m.VolumesEncoded == 0 { | |
| 		return 0 | |
| 	} | |
| 	return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded) | |
| } | |
| 
 | |
| // GetSourceRemovalRate returns the percentage of sources removed after encoding | |
| func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	if m.VolumesEncoded == 0 { | |
| 		return 0 | |
| 	} | |
| 	return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0 | |
| } | |
| 
 | |
| // Reset resets all metrics to zero | |
| func (m *ErasureCodingMetrics) Reset() { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	*m = ErasureCodingMetrics{ | |
| 		LastEncodingTime:    time.Now(), | |
| 		ShardsPerDataCenter: make(map[string]int64), | |
| 		ShardsPerRack:       make(map[string]int64), | |
| 	} | |
| } | |
| 
 | |
| // Global metrics instance for erasure coding tasks | |
| var globalErasureCodingMetrics = NewErasureCodingMetrics() | |
| 
 | |
| // GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance | |
| func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics { | |
| 	return globalErasureCodingMetrics | |
| }
 |