You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							235 lines
						
					
					
						
							6.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							235 lines
						
					
					
						
							6.8 KiB
						
					
					
				
								package storage
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/prometheus/client_golang/prometheus"
							 | 
						|
									"github.com/prometheus/client_golang/prometheus/promauto"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/telemetry/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type PrometheusStorage struct {
							 | 
						|
									// Prometheus metrics
							 | 
						|
									totalClusters     prometheus.Gauge
							 | 
						|
									activeClusters    prometheus.Gauge
							 | 
						|
									volumeServerCount *prometheus.GaugeVec
							 | 
						|
									totalDiskBytes    *prometheus.GaugeVec
							 | 
						|
									totalVolumeCount  *prometheus.GaugeVec
							 | 
						|
									filerCount        *prometheus.GaugeVec
							 | 
						|
									brokerCount       *prometheus.GaugeVec
							 | 
						|
									clusterInfo       *prometheus.GaugeVec
							 | 
						|
									telemetryReceived prometheus.Counter
							 | 
						|
								
							 | 
						|
									// In-memory storage for API endpoints (if needed)
							 | 
						|
									mu        sync.RWMutex
							 | 
						|
									instances map[string]*telemetryData
							 | 
						|
									stats     map[string]interface{}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// telemetryData is an internal struct that includes the received timestamp
							 | 
						|
								type telemetryData struct {
							 | 
						|
									*proto.TelemetryData
							 | 
						|
									ReceivedAt time.Time `json:"received_at"`
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewPrometheusStorage() *PrometheusStorage {
							 | 
						|
									return &PrometheusStorage{
							 | 
						|
										totalClusters: promauto.NewGauge(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_total_clusters",
							 | 
						|
											Help: "Total number of unique SeaweedFS clusters (last 30 days)",
							 | 
						|
										}),
							 | 
						|
										activeClusters: promauto.NewGauge(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_active_clusters",
							 | 
						|
											Help: "Number of active SeaweedFS clusters (last 7 days)",
							 | 
						|
										}),
							 | 
						|
										volumeServerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_volume_servers",
							 | 
						|
											Help: "Number of volume servers per cluster",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										totalDiskBytes: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_disk_bytes",
							 | 
						|
											Help: "Total disk usage in bytes per cluster",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										totalVolumeCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_volume_count",
							 | 
						|
											Help: "Total number of volumes per cluster",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										filerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_filer_count",
							 | 
						|
											Help: "Number of filer servers per cluster",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										brokerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_broker_count",
							 | 
						|
											Help: "Number of broker servers per cluster",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										clusterInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_cluster_info",
							 | 
						|
											Help: "Cluster information (always 1, labels contain metadata)",
							 | 
						|
										}, []string{"cluster_id", "version", "os"}),
							 | 
						|
										telemetryReceived: promauto.NewCounter(prometheus.CounterOpts{
							 | 
						|
											Name: "seaweedfs_telemetry_reports_received_total",
							 | 
						|
											Help: "Total number of telemetry reports received",
							 | 
						|
										}),
							 | 
						|
										instances: make(map[string]*telemetryData),
							 | 
						|
										stats:     make(map[string]interface{}),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *PrometheusStorage) StoreTelemetry(data *proto.TelemetryData) error {
							 | 
						|
									s.mu.Lock()
							 | 
						|
									defer s.mu.Unlock()
							 | 
						|
								
							 | 
						|
									// Update Prometheus metrics
							 | 
						|
									labels := prometheus.Labels{
							 | 
						|
										"cluster_id": data.ClusterId,
							 | 
						|
										"version":    data.Version,
							 | 
						|
										"os":         data.Os,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									s.volumeServerCount.With(labels).Set(float64(data.VolumeServerCount))
							 | 
						|
									s.totalDiskBytes.With(labels).Set(float64(data.TotalDiskBytes))
							 | 
						|
									s.totalVolumeCount.With(labels).Set(float64(data.TotalVolumeCount))
							 | 
						|
									s.filerCount.With(labels).Set(float64(data.FilerCount))
							 | 
						|
									s.brokerCount.With(labels).Set(float64(data.BrokerCount))
							 | 
						|
								
							 | 
						|
									infoLabels := prometheus.Labels{
							 | 
						|
										"cluster_id": data.ClusterId,
							 | 
						|
										"version":    data.Version,
							 | 
						|
										"os":         data.Os,
							 | 
						|
									}
							 | 
						|
									s.clusterInfo.With(infoLabels).Set(1)
							 | 
						|
								
							 | 
						|
									s.telemetryReceived.Inc()
							 | 
						|
								
							 | 
						|
									// Store in memory for API endpoints
							 | 
						|
									s.instances[data.ClusterId] = &telemetryData{
							 | 
						|
										TelemetryData: data,
							 | 
						|
										ReceivedAt:    time.Now().UTC(),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Update aggregated stats
							 | 
						|
									s.updateStats()
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *PrometheusStorage) GetStats() (map[string]interface{}, error) {
							 | 
						|
									s.mu.RLock()
							 | 
						|
									defer s.mu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Return cached stats
							 | 
						|
									result := make(map[string]interface{})
							 | 
						|
									for k, v := range s.stats {
							 | 
						|
										result[k] = v
							 | 
						|
									}
							 | 
						|
									return result, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *PrometheusStorage) GetInstances(limit int) ([]*telemetryData, error) {
							 | 
						|
									s.mu.RLock()
							 | 
						|
									defer s.mu.RUnlock()
							 | 
						|
								
							 | 
						|
									var instances []*telemetryData
							 | 
						|
									count := 0
							 | 
						|
									for _, instance := range s.instances {
							 | 
						|
										if count >= limit {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										instances = append(instances, instance)
							 | 
						|
										count++
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return instances, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *PrometheusStorage) GetMetrics(days int) (map[string]interface{}, error) {
							 | 
						|
									s.mu.RLock()
							 | 
						|
									defer s.mu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Return current metrics from in-memory storage
							 | 
						|
									// Historical data should be queried from Prometheus directly
							 | 
						|
									cutoff := time.Now().AddDate(0, 0, -days)
							 | 
						|
								
							 | 
						|
									var volumeServers []map[string]interface{}
							 | 
						|
									var diskUsage []map[string]interface{}
							 | 
						|
								
							 | 
						|
									for _, instance := range s.instances {
							 | 
						|
										if instance.ReceivedAt.After(cutoff) {
							 | 
						|
											volumeServers = append(volumeServers, map[string]interface{}{
							 | 
						|
												"date":  instance.ReceivedAt.Format("2006-01-02"),
							 | 
						|
												"value": instance.TelemetryData.VolumeServerCount,
							 | 
						|
											})
							 | 
						|
											diskUsage = append(diskUsage, map[string]interface{}{
							 | 
						|
												"date":  instance.ReceivedAt.Format("2006-01-02"),
							 | 
						|
												"value": instance.TelemetryData.TotalDiskBytes,
							 | 
						|
											})
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return map[string]interface{}{
							 | 
						|
										"volume_servers": volumeServers,
							 | 
						|
										"disk_usage":     diskUsage,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *PrometheusStorage) updateStats() {
							 | 
						|
									now := time.Now()
							 | 
						|
									last7Days := now.AddDate(0, 0, -7)
							 | 
						|
									last30Days := now.AddDate(0, 0, -30)
							 | 
						|
								
							 | 
						|
									totalInstances := 0
							 | 
						|
									activeInstances := 0
							 | 
						|
									versions := make(map[string]int)
							 | 
						|
									osDistribution := make(map[string]int)
							 | 
						|
								
							 | 
						|
									for _, instance := range s.instances {
							 | 
						|
										if instance.ReceivedAt.After(last30Days) {
							 | 
						|
											totalInstances++
							 | 
						|
										}
							 | 
						|
										if instance.ReceivedAt.After(last7Days) {
							 | 
						|
											activeInstances++
							 | 
						|
											versions[instance.TelemetryData.Version]++
							 | 
						|
											osDistribution[instance.TelemetryData.Os]++
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Update Prometheus gauges
							 | 
						|
									s.totalClusters.Set(float64(totalInstances))
							 | 
						|
									s.activeClusters.Set(float64(activeInstances))
							 | 
						|
								
							 | 
						|
									// Update cached stats for API
							 | 
						|
									s.stats = map[string]interface{}{
							 | 
						|
										"total_instances":  totalInstances,
							 | 
						|
										"active_instances": activeInstances,
							 | 
						|
										"versions":         versions,
							 | 
						|
										"os_distribution":  osDistribution,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CleanupOldInstances removes instances older than the specified duration
							 | 
						|
								func (s *PrometheusStorage) CleanupOldInstances(maxAge time.Duration) {
							 | 
						|
									s.mu.Lock()
							 | 
						|
									defer s.mu.Unlock()
							 | 
						|
								
							 | 
						|
									cutoff := time.Now().Add(-maxAge)
							 | 
						|
									for instanceID, instance := range s.instances {
							 | 
						|
										if instance.ReceivedAt.Before(cutoff) {
							 | 
						|
											delete(s.instances, instanceID)
							 | 
						|
								
							 | 
						|
											// Remove from Prometheus metrics
							 | 
						|
											labels := prometheus.Labels{
							 | 
						|
												"cluster_id": instance.TelemetryData.ClusterId,
							 | 
						|
												"version":    instance.TelemetryData.Version,
							 | 
						|
												"os":         instance.TelemetryData.Os,
							 | 
						|
											}
							 | 
						|
											s.volumeServerCount.Delete(labels)
							 | 
						|
											s.totalDiskBytes.Delete(labels)
							 | 
						|
											s.totalVolumeCount.Delete(labels)
							 | 
						|
											s.filerCount.Delete(labels)
							 | 
						|
											s.brokerCount.Delete(labels)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									s.updateStats()
							 | 
						|
								}
							 |