You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
6.9 KiB
229 lines
6.9 KiB
package erasure_coding
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ErasureCodingMetrics contains erasure coding-specific monitoring data
|
|
type ErasureCodingMetrics struct {
|
|
// Execution metrics
|
|
VolumesEncoded int64 `json:"volumes_encoded"`
|
|
TotalShardsCreated int64 `json:"total_shards_created"`
|
|
TotalDataProcessed int64 `json:"total_data_processed"`
|
|
TotalSourcesRemoved int64 `json:"total_sources_removed"`
|
|
LastEncodingTime time.Time `json:"last_encoding_time"`
|
|
|
|
// Performance metrics
|
|
AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
|
|
AverageShardSize int64 `json:"average_shard_size"`
|
|
AverageDataShards int `json:"average_data_shards"`
|
|
AverageParityShards int `json:"average_parity_shards"`
|
|
SuccessfulOperations int64 `json:"successful_operations"`
|
|
FailedOperations int64 `json:"failed_operations"`
|
|
|
|
// Distribution metrics
|
|
ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
|
|
ShardsPerRack map[string]int64 `json:"shards_per_rack"`
|
|
PlacementSuccessRate float64 `json:"placement_success_rate"`
|
|
|
|
// Current task metrics
|
|
CurrentVolumeSize int64 `json:"current_volume_size"`
|
|
CurrentShardCount int `json:"current_shard_count"`
|
|
VolumesPendingEncoding int `json:"volumes_pending_encoding"`
|
|
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewErasureCodingMetrics creates a new erasure coding metrics instance
|
|
func NewErasureCodingMetrics() *ErasureCodingMetrics {
|
|
return &ErasureCodingMetrics{
|
|
LastEncodingTime: time.Now(),
|
|
ShardsPerDataCenter: make(map[string]int64),
|
|
ShardsPerRack: make(map[string]int64),
|
|
}
|
|
}
|
|
|
|
// RecordVolumeEncoded records a successful volume encoding operation
|
|
func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.VolumesEncoded++
|
|
m.TotalShardsCreated += int64(shardsCreated)
|
|
m.TotalDataProcessed += volumeSize
|
|
m.SuccessfulOperations++
|
|
m.LastEncodingTime = time.Now()
|
|
|
|
if sourceRemoved {
|
|
m.TotalSourcesRemoved++
|
|
}
|
|
|
|
// Update average encoding time
|
|
if m.AverageEncodingTime == 0 {
|
|
m.AverageEncodingTime = int64(encodingTime.Seconds())
|
|
} else {
|
|
// Exponential moving average
|
|
newTime := int64(encodingTime.Seconds())
|
|
m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
|
|
}
|
|
|
|
// Update average shard size
|
|
if shardsCreated > 0 {
|
|
avgShardSize := volumeSize / int64(shardsCreated)
|
|
if m.AverageShardSize == 0 {
|
|
m.AverageShardSize = avgShardSize
|
|
} else {
|
|
m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
|
|
}
|
|
}
|
|
|
|
// Update average data/parity shards
|
|
if m.AverageDataShards == 0 {
|
|
m.AverageDataShards = dataShards
|
|
m.AverageParityShards = parityShards
|
|
} else {
|
|
m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
|
|
m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
|
|
}
|
|
}
|
|
|
|
// RecordFailure records a failed erasure coding operation
|
|
func (m *ErasureCodingMetrics) RecordFailure() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.FailedOperations++
|
|
}
|
|
|
|
// RecordShardPlacement records shard placement for distribution tracking
|
|
func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.ShardsPerDataCenter[dataCenter]++
|
|
rackKey := dataCenter + ":" + rack
|
|
m.ShardsPerRack[rackKey]++
|
|
}
|
|
|
|
// UpdateCurrentVolumeInfo updates current volume processing information
|
|
func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.CurrentVolumeSize = volumeSize
|
|
m.CurrentShardCount = shardCount
|
|
}
|
|
|
|
// SetVolumesPendingEncoding sets the number of volumes pending encoding
|
|
func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.VolumesPendingEncoding = count
|
|
}
|
|
|
|
// UpdatePlacementSuccessRate updates the placement success rate
|
|
func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if m.PlacementSuccessRate == 0 {
|
|
m.PlacementSuccessRate = rate
|
|
} else {
|
|
// Exponential moving average
|
|
m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
|
|
}
|
|
}
|
|
|
|
// GetMetrics returns a copy of the current metrics (without the mutex)
|
|
func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
// Create deep copy of maps
|
|
shardsPerDC := make(map[string]int64)
|
|
for k, v := range m.ShardsPerDataCenter {
|
|
shardsPerDC[k] = v
|
|
}
|
|
|
|
shardsPerRack := make(map[string]int64)
|
|
for k, v := range m.ShardsPerRack {
|
|
shardsPerRack[k] = v
|
|
}
|
|
|
|
// Create a copy without the mutex to avoid copying lock value
|
|
return ErasureCodingMetrics{
|
|
VolumesEncoded: m.VolumesEncoded,
|
|
TotalShardsCreated: m.TotalShardsCreated,
|
|
TotalDataProcessed: m.TotalDataProcessed,
|
|
TotalSourcesRemoved: m.TotalSourcesRemoved,
|
|
LastEncodingTime: m.LastEncodingTime,
|
|
AverageEncodingTime: m.AverageEncodingTime,
|
|
AverageShardSize: m.AverageShardSize,
|
|
AverageDataShards: m.AverageDataShards,
|
|
AverageParityShards: m.AverageParityShards,
|
|
SuccessfulOperations: m.SuccessfulOperations,
|
|
FailedOperations: m.FailedOperations,
|
|
ShardsPerDataCenter: shardsPerDC,
|
|
ShardsPerRack: shardsPerRack,
|
|
PlacementSuccessRate: m.PlacementSuccessRate,
|
|
CurrentVolumeSize: m.CurrentVolumeSize,
|
|
CurrentShardCount: m.CurrentShardCount,
|
|
VolumesPendingEncoding: m.VolumesPendingEncoding,
|
|
}
|
|
}
|
|
|
|
// GetSuccessRate returns the success rate as a percentage
|
|
func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
total := m.SuccessfulOperations + m.FailedOperations
|
|
if total == 0 {
|
|
return 100.0
|
|
}
|
|
return float64(m.SuccessfulOperations) / float64(total) * 100.0
|
|
}
|
|
|
|
// GetAverageDataProcessed returns the average data processed per volume
|
|
func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
if m.VolumesEncoded == 0 {
|
|
return 0
|
|
}
|
|
return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
|
|
}
|
|
|
|
// GetSourceRemovalRate returns the percentage of sources removed after encoding
|
|
func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
if m.VolumesEncoded == 0 {
|
|
return 0
|
|
}
|
|
return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
|
|
}
|
|
|
|
// Reset resets all metrics to zero
|
|
func (m *ErasureCodingMetrics) Reset() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
*m = ErasureCodingMetrics{
|
|
LastEncodingTime: time.Now(),
|
|
ShardsPerDataCenter: make(map[string]int64),
|
|
ShardsPerRack: make(map[string]int64),
|
|
}
|
|
}
|
|
|
|
// Global metrics instance for erasure coding tasks
|
|
var globalErasureCodingMetrics = NewErasureCodingMetrics()
|
|
|
|
// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
|
|
func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
|
|
return globalErasureCodingMetrics
|
|
}
|