You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

118 lines
3.1 KiB

package dash
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// GetClusterTopology returns the current cluster topology with caching
func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) {
now := time.Now()
if s.cachedTopology != nil && now.Sub(s.lastCacheUpdate) < s.cacheExpiration {
return s.cachedTopology, nil
}
topology := &ClusterTopology{
UpdatedAt: now,
}
// Use gRPC only
err := s.getTopologyViaGRPC(topology)
if err != nil {
glog.Errorf("Failed to connect to master server %s: %v", s.masterAddress, err)
return nil, fmt.Errorf("gRPC topology request failed: %v", err)
}
// Cache the result
s.cachedTopology = topology
s.lastCacheUpdate = now
return topology, nil
}
// getTopologyViaGRPC gets topology using gRPC (original method)
func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
// Get cluster status from master
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
glog.Errorf("Failed to get volume list from master %s: %v", s.masterAddress, err)
return err
}
if resp.TopologyInfo != nil {
// Process gRPC response
for _, dc := range resp.TopologyInfo.DataCenterInfos {
dataCenter := DataCenter{
ID: dc.Id,
Racks: []Rack{},
}
for _, rack := range dc.RackInfos {
rackObj := Rack{
ID: rack.Id,
Nodes: []VolumeServer{},
}
for _, node := range rack.DataNodeInfos {
// Calculate totals from disk infos
var totalVolumes int64
var totalMaxVolumes int64
var totalSize int64
var totalFiles int64
for _, diskInfo := range node.DiskInfos {
totalVolumes += diskInfo.VolumeCount
totalMaxVolumes += diskInfo.MaxVolumeCount
// Sum up individual volume information
for _, volInfo := range diskInfo.VolumeInfos {
totalSize += int64(volInfo.Size)
totalFiles += int64(volInfo.FileCount)
}
}
vs := VolumeServer{
ID: node.Id,
Address: node.Id,
DataCenter: dc.Id,
Rack: rack.Id,
PublicURL: node.Id,
Volumes: int(totalVolumes),
MaxVolumes: int(totalMaxVolumes),
DiskUsage: totalSize,
DiskCapacity: totalMaxVolumes * int64(resp.VolumeSizeLimitMb) * 1024 * 1024,
LastHeartbeat: time.Now(),
}
rackObj.Nodes = append(rackObj.Nodes, vs)
topology.VolumeServers = append(topology.VolumeServers, vs)
topology.TotalVolumes += vs.Volumes
topology.TotalFiles += totalFiles
topology.TotalSize += totalSize
}
dataCenter.Racks = append(dataCenter.Racks, rackObj)
}
topology.DataCenters = append(topology.DataCenters, dataCenter)
}
}
return nil
})
return err
}
// InvalidateCache forces a refresh of cached data
func (s *AdminServer) InvalidateCache() {
s.lastCacheUpdate = time.Time{}
s.cachedTopology = nil
s.lastFilerUpdate = time.Time{}
s.cachedFilers = nil
}