You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

229 lines
6.9 KiB

package erasure_coding
import (
"sync"
"time"
)
// ErasureCodingMetrics contains erasure coding-specific monitoring data
type ErasureCodingMetrics struct {
// Execution metrics
VolumesEncoded int64 `json:"volumes_encoded"`
TotalShardsCreated int64 `json:"total_shards_created"`
TotalDataProcessed int64 `json:"total_data_processed"`
TotalSourcesRemoved int64 `json:"total_sources_removed"`
LastEncodingTime time.Time `json:"last_encoding_time"`
// Performance metrics
AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
AverageShardSize int64 `json:"average_shard_size"`
AverageDataShards int `json:"average_data_shards"`
AverageParityShards int `json:"average_parity_shards"`
SuccessfulOperations int64 `json:"successful_operations"`
FailedOperations int64 `json:"failed_operations"`
// Distribution metrics
ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
ShardsPerRack map[string]int64 `json:"shards_per_rack"`
PlacementSuccessRate float64 `json:"placement_success_rate"`
// Current task metrics
CurrentVolumeSize int64 `json:"current_volume_size"`
CurrentShardCount int `json:"current_shard_count"`
VolumesPendingEncoding int `json:"volumes_pending_encoding"`
mutex sync.RWMutex
}
// NewErasureCodingMetrics creates a new erasure coding metrics instance
func NewErasureCodingMetrics() *ErasureCodingMetrics {
return &ErasureCodingMetrics{
LastEncodingTime: time.Now(),
ShardsPerDataCenter: make(map[string]int64),
ShardsPerRack: make(map[string]int64),
}
}
// RecordVolumeEncoded records a successful volume encoding operation
func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesEncoded++
m.TotalShardsCreated += int64(shardsCreated)
m.TotalDataProcessed += volumeSize
m.SuccessfulOperations++
m.LastEncodingTime = time.Now()
if sourceRemoved {
m.TotalSourcesRemoved++
}
// Update average encoding time
if m.AverageEncodingTime == 0 {
m.AverageEncodingTime = int64(encodingTime.Seconds())
} else {
// Exponential moving average
newTime := int64(encodingTime.Seconds())
m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
}
// Update average shard size
if shardsCreated > 0 {
avgShardSize := volumeSize / int64(shardsCreated)
if m.AverageShardSize == 0 {
m.AverageShardSize = avgShardSize
} else {
m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
}
}
// Update average data/parity shards
if m.AverageDataShards == 0 {
m.AverageDataShards = dataShards
m.AverageParityShards = parityShards
} else {
m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
}
}
// RecordFailure records a failed erasure coding operation
func (m *ErasureCodingMetrics) RecordFailure() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.FailedOperations++
}
// RecordShardPlacement records shard placement for distribution tracking
func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ShardsPerDataCenter[dataCenter]++
rackKey := dataCenter + ":" + rack
m.ShardsPerRack[rackKey]++
}
// UpdateCurrentVolumeInfo updates current volume processing information
func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.CurrentVolumeSize = volumeSize
m.CurrentShardCount = shardCount
}
// SetVolumesPendingEncoding sets the number of volumes pending encoding
func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesPendingEncoding = count
}
// UpdatePlacementSuccessRate updates the placement success rate
func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.PlacementSuccessRate == 0 {
m.PlacementSuccessRate = rate
} else {
// Exponential moving average
m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
}
}
// GetMetrics returns a copy of the current metrics (without the mutex)
func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
m.mutex.RLock()
defer m.mutex.RUnlock()
// Create deep copy of maps
shardsPerDC := make(map[string]int64)
for k, v := range m.ShardsPerDataCenter {
shardsPerDC[k] = v
}
shardsPerRack := make(map[string]int64)
for k, v := range m.ShardsPerRack {
shardsPerRack[k] = v
}
// Create a copy without the mutex to avoid copying lock value
return ErasureCodingMetrics{
VolumesEncoded: m.VolumesEncoded,
TotalShardsCreated: m.TotalShardsCreated,
TotalDataProcessed: m.TotalDataProcessed,
TotalSourcesRemoved: m.TotalSourcesRemoved,
LastEncodingTime: m.LastEncodingTime,
AverageEncodingTime: m.AverageEncodingTime,
AverageShardSize: m.AverageShardSize,
AverageDataShards: m.AverageDataShards,
AverageParityShards: m.AverageParityShards,
SuccessfulOperations: m.SuccessfulOperations,
FailedOperations: m.FailedOperations,
ShardsPerDataCenter: shardsPerDC,
ShardsPerRack: shardsPerRack,
PlacementSuccessRate: m.PlacementSuccessRate,
CurrentVolumeSize: m.CurrentVolumeSize,
CurrentShardCount: m.CurrentShardCount,
VolumesPendingEncoding: m.VolumesPendingEncoding,
}
}
// GetSuccessRate returns the success rate as a percentage
func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
total := m.SuccessfulOperations + m.FailedOperations
if total == 0 {
return 100.0
}
return float64(m.SuccessfulOperations) / float64(total) * 100.0
}
// GetAverageDataProcessed returns the average data processed per volume
func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
if m.VolumesEncoded == 0 {
return 0
}
return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
}
// GetSourceRemovalRate returns the percentage of sources removed after encoding
func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
if m.VolumesEncoded == 0 {
return 0
}
return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
}
// Reset resets all metrics to zero
func (m *ErasureCodingMetrics) Reset() {
m.mutex.Lock()
defer m.mutex.Unlock()
*m = ErasureCodingMetrics{
LastEncodingTime: time.Now(),
ShardsPerDataCenter: make(map[string]int64),
ShardsPerRack: make(map[string]int64),
}
}
// Global metrics instance for erasure coding tasks
var globalErasureCodingMetrics = NewErasureCodingMetrics()
// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
return globalErasureCodingMetrics
}