You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							138 lines
						
					
					
						
							3.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							138 lines
						
					
					
						
							3.9 KiB
						
					
					
				| package balance | |
| 
 | |
| import ( | |
| 	"sync" | |
| 	"time" | |
| ) | |
| 
 | |
| // BalanceMetrics contains balance-specific monitoring data | |
| type BalanceMetrics struct { | |
| 	// Execution metrics | |
| 	VolumesBalanced      int64     `json:"volumes_balanced"` | |
| 	TotalDataTransferred int64     `json:"total_data_transferred"` | |
| 	AverageImbalance     float64   `json:"average_imbalance"` | |
| 	LastBalanceTime      time.Time `json:"last_balance_time"` | |
| 
 | |
| 	// Performance metrics | |
| 	AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"` | |
| 	TotalExecutionTime   int64   `json:"total_execution_time_seconds"` | |
| 	SuccessfulOperations int64   `json:"successful_operations"` | |
| 	FailedOperations     int64   `json:"failed_operations"` | |
| 
 | |
| 	// Current task metrics | |
| 	CurrentImbalanceScore float64 `json:"current_imbalance_score"` | |
| 	PlannedDestinations   int     `json:"planned_destinations"` | |
| 
 | |
| 	mutex sync.RWMutex | |
| } | |
| 
 | |
| // NewBalanceMetrics creates a new balance metrics instance | |
| func NewBalanceMetrics() *BalanceMetrics { | |
| 	return &BalanceMetrics{ | |
| 		LastBalanceTime: time.Now(), | |
| 	} | |
| } | |
| 
 | |
| // RecordVolumeBalanced records a successful volume balance operation | |
| func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.VolumesBalanced++ | |
| 	m.TotalDataTransferred += volumeSize | |
| 	m.SuccessfulOperations++ | |
| 	m.LastBalanceTime = time.Now() | |
| 	m.TotalExecutionTime += int64(transferTime.Seconds()) | |
| 
 | |
| 	// Calculate average transfer speed (MB/s) | |
| 	if transferTime > 0 { | |
| 		speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds() | |
| 		if m.AverageTransferSpeed == 0 { | |
| 			m.AverageTransferSpeed = speedMBps | |
| 		} else { | |
| 			// Exponential moving average | |
| 			m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // RecordFailure records a failed balance operation | |
| func (m *BalanceMetrics) RecordFailure() { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.FailedOperations++ | |
| } | |
| 
 | |
| // UpdateImbalanceScore updates the current cluster imbalance score | |
| func (m *BalanceMetrics) UpdateImbalanceScore(score float64) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.CurrentImbalanceScore = score | |
| 
 | |
| 	// Update average imbalance with exponential moving average | |
| 	if m.AverageImbalance == 0 { | |
| 		m.AverageImbalance = score | |
| 	} else { | |
| 		m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score | |
| 	} | |
| } | |
| 
 | |
| // SetPlannedDestinations sets the number of planned destinations | |
| func (m *BalanceMetrics) SetPlannedDestinations(count int) { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	m.PlannedDestinations = count | |
| } | |
| 
 | |
| // GetMetrics returns a copy of the current metrics (without the mutex) | |
| func (m *BalanceMetrics) GetMetrics() BalanceMetrics { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	// Create a copy without the mutex to avoid copying lock value | |
| 	return BalanceMetrics{ | |
| 		VolumesBalanced:       m.VolumesBalanced, | |
| 		TotalDataTransferred:  m.TotalDataTransferred, | |
| 		AverageImbalance:      m.AverageImbalance, | |
| 		LastBalanceTime:       m.LastBalanceTime, | |
| 		AverageTransferSpeed:  m.AverageTransferSpeed, | |
| 		TotalExecutionTime:    m.TotalExecutionTime, | |
| 		SuccessfulOperations:  m.SuccessfulOperations, | |
| 		FailedOperations:      m.FailedOperations, | |
| 		CurrentImbalanceScore: m.CurrentImbalanceScore, | |
| 		PlannedDestinations:   m.PlannedDestinations, | |
| 	} | |
| } | |
| 
 | |
| // GetSuccessRate returns the success rate as a percentage | |
| func (m *BalanceMetrics) GetSuccessRate() float64 { | |
| 	m.mutex.RLock() | |
| 	defer m.mutex.RUnlock() | |
| 
 | |
| 	total := m.SuccessfulOperations + m.FailedOperations | |
| 	if total == 0 { | |
| 		return 100.0 | |
| 	} | |
| 	return float64(m.SuccessfulOperations) / float64(total) * 100.0 | |
| } | |
| 
 | |
| // Reset resets all metrics to zero | |
| func (m *BalanceMetrics) Reset() { | |
| 	m.mutex.Lock() | |
| 	defer m.mutex.Unlock() | |
| 
 | |
| 	*m = BalanceMetrics{ | |
| 		LastBalanceTime: time.Now(), | |
| 	} | |
| } | |
| 
 | |
| // Global metrics instance for balance tasks | |
| var globalBalanceMetrics = NewBalanceMetrics() | |
| 
 | |
| // GetGlobalBalanceMetrics returns the global balance metrics instance | |
| func GetGlobalBalanceMetrics() *BalanceMetrics { | |
| 	return globalBalanceMetrics | |
| }
 |