You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

138 lines
3.9 KiB

package balance
import (
"sync"
"time"
)
// BalanceMetrics contains balance-specific monitoring data
type BalanceMetrics struct {
// Execution metrics
VolumesBalanced int64 `json:"volumes_balanced"`
TotalDataTransferred int64 `json:"total_data_transferred"`
AverageImbalance float64 `json:"average_imbalance"`
LastBalanceTime time.Time `json:"last_balance_time"`
// Performance metrics
AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"`
TotalExecutionTime int64 `json:"total_execution_time_seconds"`
SuccessfulOperations int64 `json:"successful_operations"`
FailedOperations int64 `json:"failed_operations"`
// Current task metrics
CurrentImbalanceScore float64 `json:"current_imbalance_score"`
PlannedDestinations int `json:"planned_destinations"`
mutex sync.RWMutex
}
// NewBalanceMetrics creates a new balance metrics instance
func NewBalanceMetrics() *BalanceMetrics {
return &BalanceMetrics{
LastBalanceTime: time.Now(),
}
}
// RecordVolumeBalanced records a successful volume balance operation
func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesBalanced++
m.TotalDataTransferred += volumeSize
m.SuccessfulOperations++
m.LastBalanceTime = time.Now()
m.TotalExecutionTime += int64(transferTime.Seconds())
// Calculate average transfer speed (MB/s)
if transferTime > 0 {
speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds()
if m.AverageTransferSpeed == 0 {
m.AverageTransferSpeed = speedMBps
} else {
// Exponential moving average
m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps
}
}
}
// RecordFailure records a failed balance operation
func (m *BalanceMetrics) RecordFailure() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.FailedOperations++
}
// UpdateImbalanceScore updates the current cluster imbalance score
func (m *BalanceMetrics) UpdateImbalanceScore(score float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.CurrentImbalanceScore = score
// Update average imbalance with exponential moving average
if m.AverageImbalance == 0 {
m.AverageImbalance = score
} else {
m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score
}
}
// SetPlannedDestinations sets the number of planned destinations
func (m *BalanceMetrics) SetPlannedDestinations(count int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.PlannedDestinations = count
}
// GetMetrics returns a copy of the current metrics (without the mutex)
func (m *BalanceMetrics) GetMetrics() BalanceMetrics {
m.mutex.RLock()
defer m.mutex.RUnlock()
// Create a copy without the mutex to avoid copying lock value
return BalanceMetrics{
VolumesBalanced: m.VolumesBalanced,
TotalDataTransferred: m.TotalDataTransferred,
AverageImbalance: m.AverageImbalance,
LastBalanceTime: m.LastBalanceTime,
AverageTransferSpeed: m.AverageTransferSpeed,
TotalExecutionTime: m.TotalExecutionTime,
SuccessfulOperations: m.SuccessfulOperations,
FailedOperations: m.FailedOperations,
CurrentImbalanceScore: m.CurrentImbalanceScore,
PlannedDestinations: m.PlannedDestinations,
}
}
// GetSuccessRate returns the success rate as a percentage
func (m *BalanceMetrics) GetSuccessRate() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
total := m.SuccessfulOperations + m.FailedOperations
if total == 0 {
return 100.0
}
return float64(m.SuccessfulOperations) / float64(total) * 100.0
}
// Reset resets all metrics to zero
func (m *BalanceMetrics) Reset() {
m.mutex.Lock()
defer m.mutex.Unlock()
*m = BalanceMetrics{
LastBalanceTime: time.Now(),
}
}
// Global metrics instance for balance tasks
var globalBalanceMetrics = NewBalanceMetrics()
// GetGlobalBalanceMetrics returns the global balance metrics instance
func GetGlobalBalanceMetrics() *BalanceMetrics {
return globalBalanceMetrics
}