From e85fbd29a186e74ce5de03ba2c6e501ee866dff7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 4 Jul 2025 13:33:58 -0700 Subject: [PATCH] refactor --- weed/admin/dash/admin_server.go | 861 +---------------------- weed/admin/dash/client_management.go | 96 +++ weed/admin/dash/cluster_topology.go | 118 ++++ weed/admin/dash/collection_management.go | 129 ++++ weed/admin/dash/types.go | 198 ++++++ weed/admin/dash/volume_management.go | 378 ++++++++++ 6 files changed, 933 insertions(+), 847 deletions(-) create mode 100644 weed/admin/dash/client_management.go create mode 100644 weed/admin/dash/cluster_topology.go create mode 100644 weed/admin/dash/collection_management.go create mode 100644 weed/admin/dash/types.go create mode 100644 weed/admin/dash/volume_management.go diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 93b556af8..03a44a6da 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -5,19 +5,15 @@ import ( "context" "fmt" "net/http" - "sort" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" @@ -40,190 +36,7 @@ type AdminServer struct { credentialManager *credential.CredentialManager } -type ClusterTopology struct { - Masters []MasterNode `json:"masters"` - DataCenters []DataCenter `json:"datacenters"` - VolumeServers []VolumeServer `json:"volume_servers"` - TotalVolumes int `json:"total_volumes"` - TotalFiles int64 `json:"total_files"` - TotalSize int64 `json:"total_size"` - UpdatedAt time.Time `json:"updated_at"` -} - -type MasterNode struct { - Address string `json:"address"` - IsLeader bool `json:"is_leader"` -} - -type DataCenter struct { - ID string `json:"id"` - Racks []Rack `json:"racks"` -} - -type Rack struct { - ID string `json:"id"` - Nodes []VolumeServer `json:"nodes"` -} - -type VolumeServer struct { - ID string `json:"id"` - Address string `json:"address"` - DataCenter string `json:"datacenter"` - Rack string `json:"rack"` - PublicURL string `json:"public_url"` - Volumes int `json:"volumes"` - MaxVolumes int `json:"max_volumes"` - DiskUsage int64 `json:"disk_usage"` - DiskCapacity int64 `json:"disk_capacity"` - LastHeartbeat time.Time `json:"last_heartbeat"` -} - -// S3 Bucket management structures -type S3Bucket struct { - Name string `json:"name"` - CreatedAt time.Time `json:"created_at"` - Size int64 `json:"size"` - ObjectCount int64 `json:"object_count"` - LastModified time.Time `json:"last_modified"` - Quota int64 `json:"quota"` // Quota in bytes, 0 means no quota - QuotaEnabled bool `json:"quota_enabled"` // Whether quota is enabled -} - -type S3Object struct { - Key string `json:"key"` - Size int64 `json:"size"` - LastModified time.Time `json:"last_modified"` - ETag string `json:"etag"` - StorageClass string `json:"storage_class"` -} - -type BucketDetails struct { - Bucket S3Bucket `json:"bucket"` - Objects []S3Object `json:"objects"` - TotalSize int64 `json:"total_size"` - TotalCount int64 `json:"total_count"` - UpdatedAt time.Time `json:"updated_at"` -} - -// Cluster management data structures -type ClusterVolumeServersData struct { - Username string `json:"username"` - VolumeServers []VolumeServer `json:"volume_servers"` - TotalVolumeServers int `json:"total_volume_servers"` - TotalVolumes int `json:"total_volumes"` - TotalCapacity int64 `json:"total_capacity"` - LastUpdated time.Time `json:"last_updated"` -} - -type VolumeWithTopology struct { - *master_pb.VolumeInformationMessage - Server string `json:"server"` - DataCenter string `json:"datacenter"` - Rack string `json:"rack"` -} - -type ClusterVolumesData struct { - Username string `json:"username"` - Volumes []VolumeWithTopology `json:"volumes"` - TotalVolumes int `json:"total_volumes"` - TotalSize int64 `json:"total_size"` - LastUpdated time.Time `json:"last_updated"` - - // Pagination - CurrentPage int `json:"current_page"` - TotalPages int `json:"total_pages"` - PageSize int `json:"page_size"` - - // Sorting - SortBy string `json:"sort_by"` - SortOrder string `json:"sort_order"` - - // Statistics - DataCenterCount int `json:"datacenter_count"` - RackCount int `json:"rack_count"` - DiskTypeCount int `json:"disk_type_count"` - CollectionCount int `json:"collection_count"` - VersionCount int `json:"version_count"` - - // Conditional display flags - ShowDataCenterColumn bool `json:"show_datacenter_column"` - ShowRackColumn bool `json:"show_rack_column"` - ShowDiskTypeColumn bool `json:"show_disk_type_column"` - ShowCollectionColumn bool `json:"show_collection_column"` - ShowVersionColumn bool `json:"show_version_column"` - - // Single values when only one exists - SingleDataCenter string `json:"single_datacenter"` - SingleRack string `json:"single_rack"` - SingleDiskType string `json:"single_disk_type"` - SingleCollection string `json:"single_collection"` - SingleVersion string `json:"single_version"` - - // All versions when multiple exist - AllVersions []string `json:"all_versions"` - - // All disk types when multiple exist - AllDiskTypes []string `json:"all_disk_types"` - - // Filtering - FilterCollection string `json:"filter_collection"` -} - -type VolumeDetailsData struct { - Volume VolumeWithTopology `json:"volume"` - Replicas []VolumeWithTopology `json:"replicas"` - VolumeSizeLimit uint64 `json:"volume_size_limit"` - ReplicationCount int `json:"replication_count"` - LastUpdated time.Time `json:"last_updated"` -} - -type CollectionInfo struct { - Name string `json:"name"` - DataCenter string `json:"datacenter"` - VolumeCount int `json:"volume_count"` - FileCount int64 `json:"file_count"` - TotalSize int64 `json:"total_size"` - DiskTypes []string `json:"disk_types"` -} - -type ClusterCollectionsData struct { - Username string `json:"username"` - Collections []CollectionInfo `json:"collections"` - TotalCollections int `json:"total_collections"` - TotalVolumes int `json:"total_volumes"` - TotalFiles int64 `json:"total_files"` - TotalSize int64 `json:"total_size"` - LastUpdated time.Time `json:"last_updated"` -} - -type MasterInfo struct { - Address string `json:"address"` - IsLeader bool `json:"is_leader"` - Suffrage string `json:"suffrage"` -} - -type ClusterMastersData struct { - Username string `json:"username"` - Masters []MasterInfo `json:"masters"` - TotalMasters int `json:"total_masters"` - LeaderCount int `json:"leader_count"` - LastUpdated time.Time `json:"last_updated"` -} - -type FilerInfo struct { - Address string `json:"address"` - DataCenter string `json:"datacenter"` - Rack string `json:"rack"` - Version string `json:"version"` - CreatedAt time.Time `json:"created_at"` -} - -type ClusterFilersData struct { - Username string `json:"username"` - Filers []FilerInfo `json:"filers"` - TotalFilers int `json:"total_filers"` - LastUpdated time.Time `json:"last_updated"` -} +// Type definitions moved to types.go func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer { server := &AdminServer{ @@ -277,189 +90,17 @@ func (s *AdminServer) GetCredentialManager() *credential.CredentialManager { return s.credentialManager } -// GetFilerAddress returns a filer address, discovering from masters if needed -func (s *AdminServer) GetFilerAddress() string { - // Discover filers from masters - filers := s.getDiscoveredFilers() - if len(filers) > 0 { - return filers[0] // Return the first available filer - } - - return "" -} - -// getDiscoveredFilers returns cached filers or discovers them from masters -func (s *AdminServer) getDiscoveredFilers() []string { - // Check if cache is still valid - if time.Since(s.lastFilerUpdate) < s.filerCacheExpiration && len(s.cachedFilers) > 0 { - return s.cachedFilers - } - - // Discover filers from masters - var filers []string - err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - }) - if err != nil { - return err - } - - for _, node := range resp.ClusterNodes { - filers = append(filers, node.Address) - } - - return nil - }) - - if err != nil { - glog.Warningf("Failed to discover filers from master %s: %v", s.masterAddress, err) - // Return cached filers even if expired, better than nothing - return s.cachedFilers - } - - // Update cache - s.cachedFilers = filers - s.lastFilerUpdate = time.Now() - - return filers -} - -// WithMasterClient executes a function with a master client connection -func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error { - masterAddr := pb.ServerAddress(s.masterAddress) - - return pb.WithMasterClient(false, masterAddr, s.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - return f(client) - }) -} - -// WithFilerClient executes a function with a filer client connection -func (s *AdminServer) WithFilerClient(f func(client filer_pb.SeaweedFilerClient) error) error { - filerAddr := s.GetFilerAddress() - if filerAddr == "" { - return fmt.Errorf("no filer available") - } - - return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddr), s.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - return f(client) - }) -} - -// WithVolumeServerClient executes a function with a volume server client connection -func (s *AdminServer) WithVolumeServerClient(address pb.ServerAddress, f func(client volume_server_pb.VolumeServerClient) error) error { - return operation.WithVolumeServerClient(false, address, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - return f(client) - }) -} - -// GetClusterTopology returns the current cluster topology with caching -func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) { - now := time.Now() - if s.cachedTopology != nil && now.Sub(s.lastCacheUpdate) < s.cacheExpiration { - return s.cachedTopology, nil - } - - topology := &ClusterTopology{ - UpdatedAt: now, - } - - // Use gRPC only - err := s.getTopologyViaGRPC(topology) - if err != nil { - glog.Errorf("Failed to connect to master server %s: %v", s.masterAddress, err) - return nil, fmt.Errorf("gRPC topology request failed: %v", err) - } - - // Cache the result - s.cachedTopology = topology - s.lastCacheUpdate = now - - return topology, nil -} - -// getTopologyViaGRPC gets topology using gRPC (original method) -func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error { - // Get cluster status from master - err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - if err != nil { - glog.Errorf("Failed to get volume list from master %s: %v", s.masterAddress, err) - return err - } - - if resp.TopologyInfo != nil { - // Process gRPC response - for _, dc := range resp.TopologyInfo.DataCenterInfos { - dataCenter := DataCenter{ - ID: dc.Id, - Racks: []Rack{}, - } - - for _, rack := range dc.RackInfos { - rackObj := Rack{ - ID: rack.Id, - Nodes: []VolumeServer{}, - } - - for _, node := range rack.DataNodeInfos { - // Calculate totals from disk infos - var totalVolumes int64 - var totalMaxVolumes int64 - var totalSize int64 - var totalFiles int64 - - for _, diskInfo := range node.DiskInfos { - totalVolumes += diskInfo.VolumeCount - totalMaxVolumes += diskInfo.MaxVolumeCount +// Filer discovery methods moved to client_management.go - // Sum up individual volume information - for _, volInfo := range diskInfo.VolumeInfos { - totalSize += int64(volInfo.Size) - totalFiles += int64(volInfo.FileCount) - } - } +// Client management methods moved to client_management.go - vs := VolumeServer{ - ID: node.Id, - Address: node.Id, - DataCenter: dc.Id, - Rack: rack.Id, - PublicURL: node.Id, - Volumes: int(totalVolumes), - MaxVolumes: int(totalMaxVolumes), - DiskUsage: totalSize, - DiskCapacity: totalMaxVolumes * int64(resp.VolumeSizeLimitMb) * 1024 * 1024, - LastHeartbeat: time.Now(), - } +// WithFilerClient and WithVolumeServerClient methods moved to client_management.go - rackObj.Nodes = append(rackObj.Nodes, vs) - topology.VolumeServers = append(topology.VolumeServers, vs) - topology.TotalVolumes += vs.Volumes - topology.TotalFiles += totalFiles - topology.TotalSize += totalSize - } +// Cluster topology methods moved to cluster_topology.go - dataCenter.Racks = append(dataCenter.Racks, rackObj) - } +// getTopologyViaGRPC method moved to cluster_topology.go - topology.DataCenters = append(topology.DataCenters, dataCenter) - } - } - - return nil - }) - - return err -} - -// InvalidateCache forces a refresh of cached data -func (s *AdminServer) InvalidateCache() { - s.lastCacheUpdate = time.Time{} - s.cachedTopology = nil - s.lastFilerUpdate = time.Time{} - s.cachedFilers = nil -} +// InvalidateCache method moved to cluster_topology.go // GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { @@ -780,393 +421,13 @@ func (s *AdminServer) GetObjectStoreUsers() ([]ObjectStoreUser, error) { return users, nil } -// GetClusterVolumeServers retrieves cluster volume servers data -func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) { - topology, err := s.GetClusterTopology() - if err != nil { - return nil, err - } +// Volume server methods moved to volume_management.go - var totalCapacity int64 - var totalVolumes int - for _, vs := range topology.VolumeServers { - totalCapacity += vs.DiskCapacity - totalVolumes += vs.Volumes - } +// Volume methods moved to volume_management.go - return &ClusterVolumeServersData{ - VolumeServers: topology.VolumeServers, - TotalVolumeServers: len(topology.VolumeServers), - TotalVolumes: totalVolumes, - TotalCapacity: totalCapacity, - LastUpdated: time.Now(), - }, nil -} +// sortVolumes method moved to volume_management.go -// GetClusterVolumes retrieves cluster volumes data with pagination, sorting, and filtering -func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterVolumesData, error) { - // Set defaults - if page < 1 { - page = 1 - } - if pageSize < 1 || pageSize > 1000 { - pageSize = 100 - } - if sortBy == "" { - sortBy = "id" - } - if sortOrder == "" { - sortOrder = "asc" - } - var volumes []VolumeWithTopology - var totalSize int64 - - // Get detailed volume information via gRPC - err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - if err != nil { - return err - } - - if resp.TopologyInfo != nil { - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for _, diskInfo := range node.DiskInfos { - for _, volInfo := range diskInfo.VolumeInfos { - volume := VolumeWithTopology{ - VolumeInformationMessage: volInfo, - Server: node.Id, - DataCenter: dc.Id, - Rack: rack.Id, - } - volumes = append(volumes, volume) - totalSize += int64(volInfo.Size) - } - } - } - } - } - } - - return nil - }) - - if err != nil { - return nil, err - } - - // Filter by collection if specified - if collection != "" { - var filteredVolumes []VolumeWithTopology - var filteredTotalSize int64 - for _, volume := range volumes { - // Handle "default" collection filtering for empty collections - volumeCollection := volume.Collection - if volumeCollection == "" { - volumeCollection = "default" - } - - if volumeCollection == collection { - filteredVolumes = append(filteredVolumes, volume) - filteredTotalSize += int64(volume.Size) - } - } - volumes = filteredVolumes - totalSize = filteredTotalSize - } - - // Calculate unique data center, rack, disk type, collection, and version counts from filtered volumes - dataCenterMap := make(map[string]bool) - rackMap := make(map[string]bool) - diskTypeMap := make(map[string]bool) - collectionMap := make(map[string]bool) - versionMap := make(map[string]bool) - for _, volume := range volumes { - if volume.DataCenter != "" { - dataCenterMap[volume.DataCenter] = true - } - if volume.Rack != "" { - rackMap[volume.Rack] = true - } - diskType := volume.DiskType - if diskType == "" { - diskType = "hdd" // Default to hdd if not specified - } - diskTypeMap[diskType] = true - - // Handle collection for display purposes - collectionName := volume.Collection - if collectionName == "" { - collectionName = "default" - } - collectionMap[collectionName] = true - - versionMap[fmt.Sprintf("%d", volume.Version)] = true - } - dataCenterCount := len(dataCenterMap) - rackCount := len(rackMap) - diskTypeCount := len(diskTypeMap) - collectionCount := len(collectionMap) - versionCount := len(versionMap) - - // Sort volumes - s.sortVolumes(volumes, sortBy, sortOrder) - - // Calculate pagination - totalVolumes := len(volumes) - totalPages := (totalVolumes + pageSize - 1) / pageSize - if totalPages == 0 { - totalPages = 1 - } - - // Apply pagination - startIndex := (page - 1) * pageSize - endIndex := startIndex + pageSize - if startIndex >= totalVolumes { - volumes = []VolumeWithTopology{} - } else { - if endIndex > totalVolumes { - endIndex = totalVolumes - } - volumes = volumes[startIndex:endIndex] - } - - // Determine conditional display flags and extract single values - showDataCenterColumn := dataCenterCount > 1 - showRackColumn := rackCount > 1 - showDiskTypeColumn := diskTypeCount > 1 - showCollectionColumn := collectionCount > 1 && collection == "" // Hide column when filtering by collection - showVersionColumn := versionCount > 1 - - var singleDataCenter, singleRack, singleDiskType, singleCollection, singleVersion string - var allVersions, allDiskTypes []string - - if dataCenterCount == 1 { - for dc := range dataCenterMap { - singleDataCenter = dc - break - } - } - if rackCount == 1 { - for rack := range rackMap { - singleRack = rack - break - } - } - if diskTypeCount == 1 { - for diskType := range diskTypeMap { - singleDiskType = diskType - break - } - } else { - // Collect all disk types and sort them - for diskType := range diskTypeMap { - allDiskTypes = append(allDiskTypes, diskType) - } - sort.Strings(allDiskTypes) - } - if collectionCount == 1 { - for collection := range collectionMap { - singleCollection = collection - break - } - } - if versionCount == 1 { - for version := range versionMap { - singleVersion = "v" + version - break - } - } else { - // Collect all versions and sort them - for version := range versionMap { - allVersions = append(allVersions, "v"+version) - } - sort.Strings(allVersions) - } - - return &ClusterVolumesData{ - Volumes: volumes, - TotalVolumes: totalVolumes, - TotalSize: totalSize, - LastUpdated: time.Now(), - CurrentPage: page, - TotalPages: totalPages, - PageSize: pageSize, - SortBy: sortBy, - SortOrder: sortOrder, - DataCenterCount: dataCenterCount, - RackCount: rackCount, - DiskTypeCount: diskTypeCount, - CollectionCount: collectionCount, - VersionCount: versionCount, - ShowDataCenterColumn: showDataCenterColumn, - ShowRackColumn: showRackColumn, - ShowDiskTypeColumn: showDiskTypeColumn, - ShowCollectionColumn: showCollectionColumn, - ShowVersionColumn: showVersionColumn, - SingleDataCenter: singleDataCenter, - SingleRack: singleRack, - SingleDiskType: singleDiskType, - SingleCollection: singleCollection, - SingleVersion: singleVersion, - AllVersions: allVersions, - AllDiskTypes: allDiskTypes, - FilterCollection: collection, - }, nil -} - -// sortVolumes sorts the volumes slice based on the specified field and order -func (s *AdminServer) sortVolumes(volumes []VolumeWithTopology, sortBy string, sortOrder string) { - sort.Slice(volumes, func(i, j int) bool { - var less bool - - switch sortBy { - case "id": - less = volumes[i].Id < volumes[j].Id - case "server": - less = volumes[i].Server < volumes[j].Server - case "datacenter": - less = volumes[i].DataCenter < volumes[j].DataCenter - case "rack": - less = volumes[i].Rack < volumes[j].Rack - case "collection": - less = volumes[i].Collection < volumes[j].Collection - case "size": - less = volumes[i].Size < volumes[j].Size - case "filecount": - less = volumes[i].FileCount < volumes[j].FileCount - case "replication": - less = volumes[i].ReplicaPlacement < volumes[j].ReplicaPlacement - case "disktype": - less = volumes[i].DiskType < volumes[j].DiskType - case "version": - less = volumes[i].Version < volumes[j].Version - default: - less = volumes[i].Id < volumes[j].Id - } - - if sortOrder == "desc" { - return !less - } - return less - }) -} - -// GetClusterCollections retrieves cluster collections data -func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { - var collections []CollectionInfo - var totalVolumes int - var totalFiles int64 - var totalSize int64 - collectionMap := make(map[string]*CollectionInfo) - - // Get actual collection information from volume data - err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - if err != nil { - return err - } - - if resp.TopologyInfo != nil { - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for _, diskInfo := range node.DiskInfos { - for _, volInfo := range diskInfo.VolumeInfos { - // Extract collection name from volume info - collectionName := volInfo.Collection - if collectionName == "" { - collectionName = "default" // Default collection for volumes without explicit collection - } - - // Get disk type from volume info, default to hdd if empty - diskType := volInfo.DiskType - if diskType == "" { - diskType = "hdd" - } - - // Get or create collection info - if collection, exists := collectionMap[collectionName]; exists { - collection.VolumeCount++ - collection.FileCount += int64(volInfo.FileCount) - collection.TotalSize += int64(volInfo.Size) - - // Update data center if this collection spans multiple DCs - if collection.DataCenter != dc.Id && collection.DataCenter != "multi" { - collection.DataCenter = "multi" - } - - // Add disk type if not already present - diskTypeExists := false - for _, existingDiskType := range collection.DiskTypes { - if existingDiskType == diskType { - diskTypeExists = true - break - } - } - if !diskTypeExists { - collection.DiskTypes = append(collection.DiskTypes, diskType) - } - - totalVolumes++ - totalFiles += int64(volInfo.FileCount) - totalSize += int64(volInfo.Size) - } else { - newCollection := CollectionInfo{ - Name: collectionName, - DataCenter: dc.Id, - VolumeCount: 1, - FileCount: int64(volInfo.FileCount), - TotalSize: int64(volInfo.Size), - DiskTypes: []string{diskType}, - } - collectionMap[collectionName] = &newCollection - totalVolumes++ - totalFiles += int64(volInfo.FileCount) - totalSize += int64(volInfo.Size) - } - } - } - } - } - } - } - - return nil - }) - - if err != nil { - return nil, err - } - - // Convert map to slice - for _, collection := range collectionMap { - collections = append(collections, *collection) - } - - // If no collections found, show a message indicating no collections exist - if len(collections) == 0 { - // Return empty collections data instead of creating fake ones - return &ClusterCollectionsData{ - Collections: []CollectionInfo{}, - TotalCollections: 0, - TotalVolumes: 0, - TotalFiles: 0, - TotalSize: 0, - LastUpdated: time.Now(), - }, nil - } - - return &ClusterCollectionsData{ - Collections: collections, - TotalCollections: len(collections), - TotalVolumes: totalVolumes, - TotalFiles: totalFiles, - TotalSize: totalSize, - LastUpdated: time.Now(), - }, nil -} +// GetClusterCollections method moved to collection_management.go // GetClusterMasters retrieves cluster masters data func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) { @@ -1302,102 +563,8 @@ func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) { }, nil } -// GetAllFilers returns all discovered filers -func (s *AdminServer) GetAllFilers() []string { - return s.getDiscoveredFilers() -} - -// GetVolumeDetails retrieves detailed information about a specific volume -func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDetailsData, error) { - var primaryVolume VolumeWithTopology - var replicas []VolumeWithTopology - var volumeSizeLimit uint64 - var found bool - - // Find the volume and all its replicas in the cluster - err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - if err != nil { - return err - } - - if resp.TopologyInfo != nil { - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for _, diskInfo := range node.DiskInfos { - for _, volInfo := range diskInfo.VolumeInfos { - if int(volInfo.Id) == volumeID { - diskType := volInfo.DiskType - if diskType == "" { - diskType = "hdd" - } - - volume := VolumeWithTopology{ - VolumeInformationMessage: volInfo, - Server: node.Id, - DataCenter: dc.Id, - Rack: rack.Id, - } - - // If this is the requested server, it's the primary volume - if node.Id == server { - primaryVolume = volume - found = true - } else { - // This is a replica on another server - replicas = append(replicas, volume) - } - } - } - } - } - } - } - } - return nil - }) - - if err != nil { - return nil, err - } +// GetAllFilers method moved to client_management.go - if !found { - return nil, fmt.Errorf("volume %d not found on server %s", volumeID, server) - } +// GetVolumeDetails method moved to volume_management.go - // Get volume size limit from master - err = s.WithMasterClient(func(client master_pb.SeaweedClient) error { - resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return err - } - volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes - return nil - }) - - if err != nil { - // If we can't get the limit, set a default - volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default - } - - return &VolumeDetailsData{ - Volume: primaryVolume, - Replicas: replicas, - VolumeSizeLimit: volumeSizeLimit, - ReplicationCount: len(replicas) + 1, // Include the primary volume - LastUpdated: time.Now(), - }, nil -} - -// VacuumVolume performs a vacuum operation on a specific volume -func (s *AdminServer) VacuumVolume(volumeID int, server string) error { - return s.WithMasterClient(func(client master_pb.SeaweedClient) error { - _, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ - VolumeId: uint32(volumeID), - GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected - Collection: "", // Empty for all collections - }) - return err - }) -} +// VacuumVolume method moved to volume_management.go diff --git a/weed/admin/dash/client_management.go b/weed/admin/dash/client_management.go new file mode 100644 index 000000000..7bebc6dd2 --- /dev/null +++ b/weed/admin/dash/client_management.go @@ -0,0 +1,96 @@ +package dash + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +// WithMasterClient executes a function with a master client connection +func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error { + masterAddr := pb.ServerAddress(s.masterAddress) + + return pb.WithMasterClient(false, masterAddr, s.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + return f(client) + }) +} + +// WithFilerClient executes a function with a filer client connection +func (s *AdminServer) WithFilerClient(f func(client filer_pb.SeaweedFilerClient) error) error { + filerAddr := s.GetFilerAddress() + if filerAddr == "" { + return fmt.Errorf("no filer available") + } + + return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddr), s.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return f(client) + }) +} + +// WithVolumeServerClient executes a function with a volume server client connection +func (s *AdminServer) WithVolumeServerClient(address pb.ServerAddress, f func(client volume_server_pb.VolumeServerClient) error) error { + return operation.WithVolumeServerClient(false, address, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return f(client) + }) +} + +// GetFilerAddress returns a filer address, discovering from masters if needed +func (s *AdminServer) GetFilerAddress() string { + // Discover filers from masters + filers := s.getDiscoveredFilers() + if len(filers) > 0 { + return filers[0] // Return the first available filer + } + + return "" +} + +// getDiscoveredFilers returns cached filers or discovers them from masters +func (s *AdminServer) getDiscoveredFilers() []string { + // Check if cache is still valid + if time.Since(s.lastFilerUpdate) < s.filerCacheExpiration && len(s.cachedFilers) > 0 { + return s.cachedFilers + } + + // Discover filers from masters + var filers []string + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return err + } + + for _, node := range resp.ClusterNodes { + filers = append(filers, node.Address) + } + + return nil + }) + + if err != nil { + glog.Warningf("Failed to discover filers from master %s: %v", s.masterAddress, err) + // Return cached filers even if expired, better than nothing + return s.cachedFilers + } + + // Update cache + s.cachedFilers = filers + s.lastFilerUpdate = time.Now() + + return filers +} + +// GetAllFilers returns all discovered filers +func (s *AdminServer) GetAllFilers() []string { + return s.getDiscoveredFilers() +} diff --git a/weed/admin/dash/cluster_topology.go b/weed/admin/dash/cluster_topology.go new file mode 100644 index 000000000..3670220ad --- /dev/null +++ b/weed/admin/dash/cluster_topology.go @@ -0,0 +1,118 @@ +package dash + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// GetClusterTopology returns the current cluster topology with caching +func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) { + now := time.Now() + if s.cachedTopology != nil && now.Sub(s.lastCacheUpdate) < s.cacheExpiration { + return s.cachedTopology, nil + } + + topology := &ClusterTopology{ + UpdatedAt: now, + } + + // Use gRPC only + err := s.getTopologyViaGRPC(topology) + if err != nil { + glog.Errorf("Failed to connect to master server %s: %v", s.masterAddress, err) + return nil, fmt.Errorf("gRPC topology request failed: %v", err) + } + + // Cache the result + s.cachedTopology = topology + s.lastCacheUpdate = now + + return topology, nil +} + +// getTopologyViaGRPC gets topology using gRPC (original method) +func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error { + // Get cluster status from master + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + glog.Errorf("Failed to get volume list from master %s: %v", s.masterAddress, err) + return err + } + + if resp.TopologyInfo != nil { + // Process gRPC response + for _, dc := range resp.TopologyInfo.DataCenterInfos { + dataCenter := DataCenter{ + ID: dc.Id, + Racks: []Rack{}, + } + + for _, rack := range dc.RackInfos { + rackObj := Rack{ + ID: rack.Id, + Nodes: []VolumeServer{}, + } + + for _, node := range rack.DataNodeInfos { + // Calculate totals from disk infos + var totalVolumes int64 + var totalMaxVolumes int64 + var totalSize int64 + var totalFiles int64 + + for _, diskInfo := range node.DiskInfos { + totalVolumes += diskInfo.VolumeCount + totalMaxVolumes += diskInfo.MaxVolumeCount + + // Sum up individual volume information + for _, volInfo := range diskInfo.VolumeInfos { + totalSize += int64(volInfo.Size) + totalFiles += int64(volInfo.FileCount) + } + } + + vs := VolumeServer{ + ID: node.Id, + Address: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + PublicURL: node.Id, + Volumes: int(totalVolumes), + MaxVolumes: int(totalMaxVolumes), + DiskUsage: totalSize, + DiskCapacity: totalMaxVolumes * int64(resp.VolumeSizeLimitMb) * 1024 * 1024, + LastHeartbeat: time.Now(), + } + + rackObj.Nodes = append(rackObj.Nodes, vs) + topology.VolumeServers = append(topology.VolumeServers, vs) + topology.TotalVolumes += vs.Volumes + topology.TotalFiles += totalFiles + topology.TotalSize += totalSize + } + + dataCenter.Racks = append(dataCenter.Racks, rackObj) + } + + topology.DataCenters = append(topology.DataCenters, dataCenter) + } + } + + return nil + }) + + return err +} + +// InvalidateCache forces a refresh of cached data +func (s *AdminServer) InvalidateCache() { + s.lastCacheUpdate = time.Time{} + s.cachedTopology = nil + s.lastFilerUpdate = time.Time{} + s.cachedFilers = nil +} diff --git a/weed/admin/dash/collection_management.go b/weed/admin/dash/collection_management.go new file mode 100644 index 000000000..a70c82918 --- /dev/null +++ b/weed/admin/dash/collection_management.go @@ -0,0 +1,129 @@ +package dash + +import ( + "context" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// GetClusterCollections retrieves cluster collections data +func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { + var collections []CollectionInfo + var totalVolumes int + var totalFiles int64 + var totalSize int64 + collectionMap := make(map[string]*CollectionInfo) + + // Get actual collection information from volume data + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + // Extract collection name from volume info + collectionName := volInfo.Collection + if collectionName == "" { + collectionName = "default" // Default collection for volumes without explicit collection + } + + // Get disk type from volume info, default to hdd if empty + diskType := volInfo.DiskType + if diskType == "" { + diskType = "hdd" + } + + // Get or create collection info + if collection, exists := collectionMap[collectionName]; exists { + collection.VolumeCount++ + collection.FileCount += int64(volInfo.FileCount) + collection.TotalSize += int64(volInfo.Size) + + // Update data center if this collection spans multiple DCs + if collection.DataCenter != dc.Id && collection.DataCenter != "multi" { + collection.DataCenter = "multi" + } + + // Add disk type if not already present + diskTypeExists := false + for _, existingDiskType := range collection.DiskTypes { + if existingDiskType == diskType { + diskTypeExists = true + break + } + } + if !diskTypeExists { + collection.DiskTypes = append(collection.DiskTypes, diskType) + } + + totalVolumes++ + totalFiles += int64(volInfo.FileCount) + totalSize += int64(volInfo.Size) + } else { + newCollection := CollectionInfo{ + Name: collectionName, + DataCenter: dc.Id, + VolumeCount: 1, + FileCount: int64(volInfo.FileCount), + TotalSize: int64(volInfo.Size), + DiskTypes: []string{diskType}, + } + collectionMap[collectionName] = &newCollection + totalVolumes++ + totalFiles += int64(volInfo.FileCount) + totalSize += int64(volInfo.Size) + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Convert map to slice + for _, collection := range collectionMap { + collections = append(collections, *collection) + } + + // Sort collections alphabetically by name + sort.Slice(collections, func(i, j int) bool { + return collections[i].Name < collections[j].Name + }) + + // If no collections found, show a message indicating no collections exist + if len(collections) == 0 { + // Return empty collections data instead of creating fake ones + return &ClusterCollectionsData{ + Collections: []CollectionInfo{}, + TotalCollections: 0, + TotalVolumes: 0, + TotalFiles: 0, + TotalSize: 0, + LastUpdated: time.Now(), + }, nil + } + + return &ClusterCollectionsData{ + Collections: collections, + TotalCollections: len(collections), + TotalVolumes: totalVolumes, + TotalFiles: totalFiles, + TotalSize: totalSize, + LastUpdated: time.Now(), + }, nil +} diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go new file mode 100644 index 000000000..4ca590450 --- /dev/null +++ b/weed/admin/dash/types.go @@ -0,0 +1,198 @@ +package dash + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// Core cluster topology structures +type ClusterTopology struct { + Masters []MasterNode `json:"masters"` + DataCenters []DataCenter `json:"datacenters"` + VolumeServers []VolumeServer `json:"volume_servers"` + TotalVolumes int `json:"total_volumes"` + TotalFiles int64 `json:"total_files"` + TotalSize int64 `json:"total_size"` + UpdatedAt time.Time `json:"updated_at"` +} + +type MasterNode struct { + Address string `json:"address"` + IsLeader bool `json:"is_leader"` +} + +type DataCenter struct { + ID string `json:"id"` + Racks []Rack `json:"racks"` +} + +type Rack struct { + ID string `json:"id"` + Nodes []VolumeServer `json:"nodes"` +} + +type VolumeServer struct { + ID string `json:"id"` + Address string `json:"address"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + PublicURL string `json:"public_url"` + Volumes int `json:"volumes"` + MaxVolumes int `json:"max_volumes"` + DiskUsage int64 `json:"disk_usage"` + DiskCapacity int64 `json:"disk_capacity"` + LastHeartbeat time.Time `json:"last_heartbeat"` +} + +// S3 Bucket management structures +type S3Bucket struct { + Name string `json:"name"` + CreatedAt time.Time `json:"created_at"` + Size int64 `json:"size"` + ObjectCount int64 `json:"object_count"` + LastModified time.Time `json:"last_modified"` + Quota int64 `json:"quota"` // Quota in bytes, 0 means no quota + QuotaEnabled bool `json:"quota_enabled"` // Whether quota is enabled +} + +type S3Object struct { + Key string `json:"key"` + Size int64 `json:"size"` + LastModified time.Time `json:"last_modified"` + ETag string `json:"etag"` + StorageClass string `json:"storage_class"` +} + +type BucketDetails struct { + Bucket S3Bucket `json:"bucket"` + Objects []S3Object `json:"objects"` + TotalSize int64 `json:"total_size"` + TotalCount int64 `json:"total_count"` + UpdatedAt time.Time `json:"updated_at"` +} + +// ObjectStoreUser is defined in admin_data.go + +// Volume management structures +type VolumeWithTopology struct { + *master_pb.VolumeInformationMessage + Server string `json:"server"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` +} + +type ClusterVolumesData struct { + Username string `json:"username"` + Volumes []VolumeWithTopology `json:"volumes"` + TotalVolumes int `json:"total_volumes"` + TotalSize int64 `json:"total_size"` + LastUpdated time.Time `json:"last_updated"` + + // Pagination + CurrentPage int `json:"current_page"` + TotalPages int `json:"total_pages"` + PageSize int `json:"page_size"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` + + // Statistics + DataCenterCount int `json:"datacenter_count"` + RackCount int `json:"rack_count"` + DiskTypeCount int `json:"disk_type_count"` + CollectionCount int `json:"collection_count"` + VersionCount int `json:"version_count"` + + // Conditional display flags + ShowDataCenterColumn bool `json:"show_datacenter_column"` + ShowRackColumn bool `json:"show_rack_column"` + ShowDiskTypeColumn bool `json:"show_disk_type_column"` + ShowCollectionColumn bool `json:"show_collection_column"` + ShowVersionColumn bool `json:"show_version_column"` + + // Single values when only one exists + SingleDataCenter string `json:"single_datacenter"` + SingleRack string `json:"single_rack"` + SingleDiskType string `json:"single_disk_type"` + SingleCollection string `json:"single_collection"` + SingleVersion string `json:"single_version"` + + // All versions when multiple exist + AllVersions []string `json:"all_versions"` + + // All disk types when multiple exist + AllDiskTypes []string `json:"all_disk_types"` + + // Filtering + FilterCollection string `json:"filter_collection"` +} + +type VolumeDetailsData struct { + Volume VolumeWithTopology `json:"volume"` + Replicas []VolumeWithTopology `json:"replicas"` + VolumeSizeLimit uint64 `json:"volume_size_limit"` + ReplicationCount int `json:"replication_count"` + LastUpdated time.Time `json:"last_updated"` +} + +// Collection management structures +type CollectionInfo struct { + Name string `json:"name"` + DataCenter string `json:"datacenter"` + VolumeCount int `json:"volume_count"` + FileCount int64 `json:"file_count"` + TotalSize int64 `json:"total_size"` + DiskTypes []string `json:"disk_types"` +} + +type ClusterCollectionsData struct { + Username string `json:"username"` + Collections []CollectionInfo `json:"collections"` + TotalCollections int `json:"total_collections"` + TotalVolumes int `json:"total_volumes"` + TotalFiles int64 `json:"total_files"` + TotalSize int64 `json:"total_size"` + LastUpdated time.Time `json:"last_updated"` +} + +// Master and Filer management structures +type MasterInfo struct { + Address string `json:"address"` + IsLeader bool `json:"is_leader"` + Suffrage string `json:"suffrage"` +} + +type ClusterMastersData struct { + Username string `json:"username"` + Masters []MasterInfo `json:"masters"` + TotalMasters int `json:"total_masters"` + LeaderCount int `json:"leader_count"` + LastUpdated time.Time `json:"last_updated"` +} + +type FilerInfo struct { + Address string `json:"address"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + Version string `json:"version"` + CreatedAt time.Time `json:"created_at"` +} + +type ClusterFilersData struct { + Username string `json:"username"` + Filers []FilerInfo `json:"filers"` + TotalFilers int `json:"total_filers"` + LastUpdated time.Time `json:"last_updated"` +} + +// Volume server management structures +type ClusterVolumeServersData struct { + Username string `json:"username"` + VolumeServers []VolumeServer `json:"volume_servers"` + TotalVolumeServers int `json:"total_volume_servers"` + TotalVolumes int `json:"total_volumes"` + TotalCapacity int64 `json:"total_capacity"` + LastUpdated time.Time `json:"last_updated"` +} diff --git a/weed/admin/dash/volume_management.go b/weed/admin/dash/volume_management.go new file mode 100644 index 000000000..608c39a91 --- /dev/null +++ b/weed/admin/dash/volume_management.go @@ -0,0 +1,378 @@ +package dash + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// GetClusterVolumes retrieves cluster volumes data with pagination, sorting, and filtering +func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterVolumesData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "id" + } + if sortOrder == "" { + sortOrder = "asc" + } + var volumes []VolumeWithTopology + var totalSize int64 + + // Get detailed volume information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + volume := VolumeWithTopology{ + VolumeInformationMessage: volInfo, + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + } + volumes = append(volumes, volume) + totalSize += int64(volInfo.Size) + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Filter by collection if specified + if collection != "" { + var filteredVolumes []VolumeWithTopology + var filteredTotalSize int64 + for _, volume := range volumes { + // Handle "default" collection filtering for empty collections + volumeCollection := volume.Collection + if volumeCollection == "" { + volumeCollection = "default" + } + + if volumeCollection == collection { + filteredVolumes = append(filteredVolumes, volume) + filteredTotalSize += int64(volume.Size) + } + } + volumes = filteredVolumes + totalSize = filteredTotalSize + } + + // Calculate unique data center, rack, disk type, collection, and version counts from filtered volumes + dataCenterMap := make(map[string]bool) + rackMap := make(map[string]bool) + diskTypeMap := make(map[string]bool) + collectionMap := make(map[string]bool) + versionMap := make(map[string]bool) + for _, volume := range volumes { + if volume.DataCenter != "" { + dataCenterMap[volume.DataCenter] = true + } + if volume.Rack != "" { + rackMap[volume.Rack] = true + } + diskType := volume.DiskType + if diskType == "" { + diskType = "hdd" // Default to hdd if not specified + } + diskTypeMap[diskType] = true + + // Handle collection for display purposes + collectionName := volume.Collection + if collectionName == "" { + collectionName = "default" + } + collectionMap[collectionName] = true + + versionMap[fmt.Sprintf("%d", volume.Version)] = true + } + dataCenterCount := len(dataCenterMap) + rackCount := len(rackMap) + diskTypeCount := len(diskTypeMap) + collectionCount := len(collectionMap) + versionCount := len(versionMap) + + // Sort volumes + s.sortVolumes(volumes, sortBy, sortOrder) + + // Calculate pagination + totalVolumes := len(volumes) + totalPages := (totalVolumes + pageSize - 1) / pageSize + if totalPages == 0 { + totalPages = 1 + } + + // Apply pagination + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if startIndex >= totalVolumes { + volumes = []VolumeWithTopology{} + } else { + if endIndex > totalVolumes { + endIndex = totalVolumes + } + volumes = volumes[startIndex:endIndex] + } + + // Determine conditional display flags and extract single values + showDataCenterColumn := dataCenterCount > 1 + showRackColumn := rackCount > 1 + showDiskTypeColumn := diskTypeCount > 1 + showCollectionColumn := collectionCount > 1 && collection == "" // Hide column when filtering by collection + showVersionColumn := versionCount > 1 + + var singleDataCenter, singleRack, singleDiskType, singleCollection, singleVersion string + var allVersions, allDiskTypes []string + + if dataCenterCount == 1 { + for dc := range dataCenterMap { + singleDataCenter = dc + break + } + } + if rackCount == 1 { + for rack := range rackMap { + singleRack = rack + break + } + } + if diskTypeCount == 1 { + for diskType := range diskTypeMap { + singleDiskType = diskType + break + } + } else { + // Collect all disk types and sort them + for diskType := range diskTypeMap { + allDiskTypes = append(allDiskTypes, diskType) + } + sort.Strings(allDiskTypes) + } + if collectionCount == 1 { + for collection := range collectionMap { + singleCollection = collection + break + } + } + if versionCount == 1 { + for version := range versionMap { + singleVersion = "v" + version + break + } + } else { + // Collect all versions and sort them + for version := range versionMap { + allVersions = append(allVersions, "v"+version) + } + sort.Strings(allVersions) + } + + return &ClusterVolumesData{ + Volumes: volumes, + TotalVolumes: totalVolumes, + TotalSize: totalSize, + LastUpdated: time.Now(), + CurrentPage: page, + TotalPages: totalPages, + PageSize: pageSize, + SortBy: sortBy, + SortOrder: sortOrder, + DataCenterCount: dataCenterCount, + RackCount: rackCount, + DiskTypeCount: diskTypeCount, + CollectionCount: collectionCount, + VersionCount: versionCount, + ShowDataCenterColumn: showDataCenterColumn, + ShowRackColumn: showRackColumn, + ShowDiskTypeColumn: showDiskTypeColumn, + ShowCollectionColumn: showCollectionColumn, + ShowVersionColumn: showVersionColumn, + SingleDataCenter: singleDataCenter, + SingleRack: singleRack, + SingleDiskType: singleDiskType, + SingleCollection: singleCollection, + SingleVersion: singleVersion, + AllVersions: allVersions, + AllDiskTypes: allDiskTypes, + FilterCollection: collection, + }, nil +} + +// sortVolumes sorts the volumes slice based on the specified field and order +func (s *AdminServer) sortVolumes(volumes []VolumeWithTopology, sortBy string, sortOrder string) { + sort.Slice(volumes, func(i, j int) bool { + var less bool + + switch sortBy { + case "id": + less = volumes[i].Id < volumes[j].Id + case "server": + less = volumes[i].Server < volumes[j].Server + case "datacenter": + less = volumes[i].DataCenter < volumes[j].DataCenter + case "rack": + less = volumes[i].Rack < volumes[j].Rack + case "collection": + less = volumes[i].Collection < volumes[j].Collection + case "size": + less = volumes[i].Size < volumes[j].Size + case "filecount": + less = volumes[i].FileCount < volumes[j].FileCount + case "replication": + less = volumes[i].ReplicaPlacement < volumes[j].ReplicaPlacement + case "disktype": + less = volumes[i].DiskType < volumes[j].DiskType + case "version": + less = volumes[i].Version < volumes[j].Version + default: + less = volumes[i].Id < volumes[j].Id + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// GetVolumeDetails retrieves detailed information about a specific volume +func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDetailsData, error) { + var primaryVolume VolumeWithTopology + var replicas []VolumeWithTopology + var volumeSizeLimit uint64 + var found bool + + // Find the volume and all its replicas in the cluster + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + if int(volInfo.Id) == volumeID { + diskType := volInfo.DiskType + if diskType == "" { + diskType = "hdd" + } + + volume := VolumeWithTopology{ + VolumeInformationMessage: volInfo, + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + } + + // If this is the requested server, it's the primary volume + if node.Id == server { + primaryVolume = volume + found = true + } else { + // This is a replica on another server + replicas = append(replicas, volume) + } + } + } + } + } + } + } + } + return nil + }) + + if err != nil { + return nil, err + } + + if !found { + return nil, fmt.Errorf("volume %d not found on server %s", volumeID, server) + } + + // Get volume size limit from master + err = s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return err + } + volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes + return nil + }) + + if err != nil { + // If we can't get the limit, set a default + volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default + } + + return &VolumeDetailsData{ + Volume: primaryVolume, + Replicas: replicas, + VolumeSizeLimit: volumeSizeLimit, + ReplicationCount: len(replicas) + 1, // Include the primary volume + LastUpdated: time.Now(), + }, nil +} + +// VacuumVolume performs a vacuum operation on a specific volume +func (s *AdminServer) VacuumVolume(volumeID int, server string) error { + return s.WithMasterClient(func(client master_pb.SeaweedClient) error { + _, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ + VolumeId: uint32(volumeID), + GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected + Collection: "", // Empty for all collections + }) + return err + }) +} + +// GetClusterVolumeServers retrieves cluster volume servers data +func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) { + topology, err := s.GetClusterTopology() + if err != nil { + return nil, err + } + + var totalCapacity int64 + var totalVolumes int + for _, vs := range topology.VolumeServers { + totalCapacity += vs.DiskCapacity + totalVolumes += vs.Volumes + } + + return &ClusterVolumeServersData{ + VolumeServers: topology.VolumeServers, + TotalVolumeServers: len(topology.VolumeServers), + TotalVolumes: totalVolumes, + TotalCapacity: totalCapacity, + LastUpdated: time.Now(), + }, nil +}