You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1072 lines
32 KiB

package dash
import (
"context"
"fmt"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// matchesCollection checks if a volume/EC volume collection matches the filter collection.
// Handles the special case where empty collection ("") represents the "default" collection.
func matchesCollection(volumeCollection, filterCollection string) bool {
// Both empty means default collection matches default filter
if volumeCollection == "" && filterCollection == "" {
return true
}
// Direct string match for named collections
return volumeCollection == filterCollection
}
// GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering
func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) {
// Set defaults
if page < 1 {
page = 1
}
if pageSize < 1 || pageSize > 1000 {
pageSize = 100
}
if sortBy == "" {
sortBy = "volume_id"
}
if sortOrder == "" {
sortOrder = "asc"
}
var ecShards []EcShardWithInfo
volumeShardsMap := make(map[uint32]map[int]bool) // volumeId -> set of shards present
volumesWithAllShards := 0
volumesWithMissingShards := 0
// Get detailed EC shard information via gRPC
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo != nil {
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
// Process EC shard information
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeId := ecShardInfo.Id
// Initialize volume shards map if needed
if volumeShardsMap[volumeId] == nil {
volumeShardsMap[volumeId] = make(map[int]bool)
}
// Create individual shard entries for each shard this server has
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
// Mark this shard as present for this volume
volumeShardsMap[volumeId][shardId] = true
ecShard := EcShardWithInfo{
VolumeID: volumeId,
ShardID: uint32(shardId),
Collection: ecShardInfo.Collection,
Size: 0, // EC shards don't have individual size in the API response
Server: node.Id,
DataCenter: dc.Id,
Rack: rack.Id,
DiskType: diskInfo.Type,
ModifiedTime: 0, // Not available in current API
EcIndexBits: ecShardInfo.EcIndexBits,
ShardCount: getShardCount(ecShardInfo.EcIndexBits),
}
ecShards = append(ecShards, ecShard)
}
}
}
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Calculate volume-level completeness (across all servers)
volumeCompleteness := make(map[uint32]bool)
volumeMissingShards := make(map[uint32][]int)
for volumeId, shardsPresent := range volumeShardsMap {
var missingShards []int
shardCount := len(shardsPresent)
// Find which shards are missing for this volume across ALL servers
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if !shardsPresent[shardId] {
missingShards = append(missingShards, shardId)
}
}
isComplete := (shardCount == erasure_coding.TotalShardsCount)
volumeCompleteness[volumeId] = isComplete
volumeMissingShards[volumeId] = missingShards
if isComplete {
volumesWithAllShards++
} else {
volumesWithMissingShards++
}
}
// Update completeness info for each shard based on volume-level completeness
for i := range ecShards {
volumeId := ecShards[i].VolumeID
ecShards[i].IsComplete = volumeCompleteness[volumeId]
ecShards[i].MissingShards = volumeMissingShards[volumeId]
}
// Filter by collection if specified
if collection != "" {
var filteredShards []EcShardWithInfo
for _, shard := range ecShards {
if shard.Collection == collection {
filteredShards = append(filteredShards, shard)
}
}
ecShards = filteredShards
}
// Sort the results
sortEcShards(ecShards, sortBy, sortOrder)
// Calculate statistics for conditional display
dataCenters := make(map[string]bool)
racks := make(map[string]bool)
collections := make(map[string]bool)
for _, shard := range ecShards {
dataCenters[shard.DataCenter] = true
racks[shard.Rack] = true
if shard.Collection != "" {
collections[shard.Collection] = true
}
}
// Pagination
totalShards := len(ecShards)
totalPages := (totalShards + pageSize - 1) / pageSize
startIndex := (page - 1) * pageSize
endIndex := startIndex + pageSize
if endIndex > totalShards {
endIndex = totalShards
}
if startIndex >= totalShards {
startIndex = 0
endIndex = 0
}
paginatedShards := ecShards[startIndex:endIndex]
// Build response
data := &ClusterEcShardsData{
EcShards: paginatedShards,
TotalShards: totalShards,
TotalVolumes: len(volumeShardsMap),
LastUpdated: time.Now(),
// Pagination
CurrentPage: page,
TotalPages: totalPages,
PageSize: pageSize,
// Sorting
SortBy: sortBy,
SortOrder: sortOrder,
// Statistics
DataCenterCount: len(dataCenters),
RackCount: len(racks),
CollectionCount: len(collections),
// Conditional display flags
ShowDataCenterColumn: len(dataCenters) > 1,
ShowRackColumn: len(racks) > 1,
ShowCollectionColumn: len(collections) > 1 || collection != "",
// Filtering
FilterCollection: collection,
// EC specific statistics
ShardsPerVolume: make(map[uint32]int), // This will be recalculated below
VolumesWithAllShards: volumesWithAllShards,
VolumesWithMissingShards: volumesWithMissingShards,
}
// Recalculate ShardsPerVolume for the response
for volumeId, shardsPresent := range volumeShardsMap {
data.ShardsPerVolume[volumeId] = len(shardsPresent)
}
// Set single values when only one exists
if len(dataCenters) == 1 {
for dc := range dataCenters {
data.SingleDataCenter = dc
break
}
}
if len(racks) == 1 {
for rack := range racks {
data.SingleRack = rack
break
}
}
if len(collections) == 1 {
for col := range collections {
data.SingleCollection = col
break
}
}
return data, nil
}
// GetClusterEcVolumes retrieves cluster EC volumes data grouped by volume ID with shard locations
func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcVolumesData, error) {
// Set defaults
if page < 1 {
page = 1
}
if pageSize < 1 || pageSize > 1000 {
pageSize = 100
}
if sortBy == "" {
sortBy = "volume_id"
}
if sortOrder == "" {
sortOrder = "asc"
}
volumeData := make(map[uint32]*EcVolumeWithShards)
totalShards := 0
// Get detailed EC shard information via gRPC
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo != nil {
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
// Process EC shard information
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeId := ecShardInfo.Id
// Initialize volume data if needed
if volumeData[volumeId] == nil {
volumeData[volumeId] = &EcVolumeWithShards{
VolumeID: volumeId,
Collection: ecShardInfo.Collection,
TotalShards: 0,
IsComplete: false,
MissingShards: []int{},
ShardLocations: make(map[int]string),
ShardSizes: make(map[int]int64),
DataCenters: []string{},
Servers: []string{},
Racks: []string{},
Generations: []uint32{},
ActiveGeneration: 0,
HasMultipleGenerations: false,
}
}
volume := volumeData[volumeId]
// Track generation information
generationExists := false
for _, existingGen := range volume.Generations {
if existingGen == ecShardInfo.Generation {
generationExists = true
break
}
}
if !generationExists {
volume.Generations = append(volume.Generations, ecShardInfo.Generation)
}
// Track data centers and servers
dcExists := false
for _, existingDc := range volume.DataCenters {
if existingDc == dc.Id {
dcExists = true
break
}
}
if !dcExists {
volume.DataCenters = append(volume.DataCenters, dc.Id)
}
serverExists := false
for _, existingServer := range volume.Servers {
if existingServer == node.Id {
serverExists = true
break
}
}
if !serverExists {
volume.Servers = append(volume.Servers, node.Id)
}
// Track racks
rackExists := false
for _, existingRack := range volume.Racks {
if existingRack == rack.Id {
rackExists = true
break
}
}
if !rackExists {
volume.Racks = append(volume.Racks, rack.Id)
}
// Process each shard this server has for this volume
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
// Record shard location
volume.ShardLocations[shardId] = node.Id
totalShards++
}
}
}
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Collect shard size information from volume servers
for volumeId, volume := range volumeData {
// Group servers by volume to minimize gRPC calls
serverHasVolume := make(map[string]bool)
for _, server := range volume.Servers {
serverHasVolume[server] = true
}
// Query each server for shard sizes
for server := range serverHasVolume {
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: volumeId,
})
if err != nil {
glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err)
return nil // Continue with other servers, don't fail the entire request
}
// Update shard sizes
for _, shardInfo := range resp.EcShardInfos {
volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size
}
return nil
})
if err != nil {
glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
}
}
}
// Get active generation information from master for each volume
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
for volumeId, volume := range volumeData {
// Look up active generation
resp, lookupErr := client.LookupEcVolume(context.Background(), &master_pb.LookupEcVolumeRequest{
VolumeId: volumeId,
})
if lookupErr == nil && resp != nil {
volume.ActiveGeneration = resp.ActiveGeneration
}
// Sort generations and check for multiple generations
if len(volume.Generations) > 1 {
// Sort generations (oldest first)
sort.Slice(volume.Generations, func(i, j int) bool {
return volume.Generations[i] < volume.Generations[j]
})
volume.HasMultipleGenerations = true
}
}
return nil // Don't fail if lookup fails
})
// Calculate completeness for each volume
completeVolumes := 0
incompleteVolumes := 0
for _, volume := range volumeData {
volume.TotalShards = len(volume.ShardLocations)
// Find missing shards
var missingShards []int
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if _, exists := volume.ShardLocations[shardId]; !exists {
missingShards = append(missingShards, shardId)
}
}
volume.MissingShards = missingShards
volume.IsComplete = (len(missingShards) == 0)
if volume.IsComplete {
completeVolumes++
} else {
incompleteVolumes++
}
}
// Convert map to slice
var ecVolumes []EcVolumeWithShards
for _, volume := range volumeData {
// Filter by collection if specified
if collection == "" || matchesCollection(volume.Collection, collection) {
ecVolumes = append(ecVolumes, *volume)
}
}
// Sort the results
sortEcVolumes(ecVolumes, sortBy, sortOrder)
// Calculate statistics for conditional display
dataCenters := make(map[string]bool)
collections := make(map[string]bool)
for _, volume := range ecVolumes {
for _, dc := range volume.DataCenters {
dataCenters[dc] = true
}
if volume.Collection != "" {
collections[volume.Collection] = true
}
}
// Pagination
totalVolumes := len(ecVolumes)
totalPages := (totalVolumes + pageSize - 1) / pageSize
startIndex := (page - 1) * pageSize
endIndex := startIndex + pageSize
if endIndex > totalVolumes {
endIndex = totalVolumes
}
if startIndex >= totalVolumes {
startIndex = 0
endIndex = 0
}
paginatedVolumes := ecVolumes[startIndex:endIndex]
// Build response
data := &ClusterEcVolumesData{
EcVolumes: paginatedVolumes,
TotalVolumes: totalVolumes,
LastUpdated: time.Now(),
// Pagination
Page: page,
PageSize: pageSize,
TotalPages: totalPages,
// Sorting
SortBy: sortBy,
SortOrder: sortOrder,
// Filtering
Collection: collection,
// Conditional display flags
ShowDataCenterColumn: len(dataCenters) > 1,
ShowRackColumn: false, // We don't track racks in this view for simplicity
ShowCollectionColumn: len(collections) > 1 || collection != "",
// Statistics
CompleteVolumes: completeVolumes,
IncompleteVolumes: incompleteVolumes,
TotalShards: totalShards,
}
return data, nil
}
// sortEcVolumes sorts EC volumes based on the specified field and order
func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string) {
sort.Slice(volumes, func(i, j int) bool {
var less bool
switch sortBy {
case "volume_id":
less = volumes[i].VolumeID < volumes[j].VolumeID
case "collection":
if volumes[i].Collection == volumes[j].Collection {
less = volumes[i].VolumeID < volumes[j].VolumeID
} else {
less = volumes[i].Collection < volumes[j].Collection
}
case "total_shards":
if volumes[i].TotalShards == volumes[j].TotalShards {
less = volumes[i].VolumeID < volumes[j].VolumeID
} else {
less = volumes[i].TotalShards < volumes[j].TotalShards
}
case "completeness":
// Complete volumes first, then by volume ID
if volumes[i].IsComplete == volumes[j].IsComplete {
less = volumes[i].VolumeID < volumes[j].VolumeID
} else {
less = volumes[i].IsComplete && !volumes[j].IsComplete
}
default:
less = volumes[i].VolumeID < volumes[j].VolumeID
}
if sortOrder == "desc" {
return !less
}
return less
})
}
// getShardCount returns the number of shards represented by the bitmap
func getShardCount(ecIndexBits uint32) int {
count := 0
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if (ecIndexBits & (1 << uint(i))) != 0 {
count++
}
}
return count
}
// getMissingShards returns a slice of missing shard IDs for a volume
func getMissingShards(ecIndexBits uint32) []int {
var missing []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if (ecIndexBits & (1 << uint(i))) == 0 {
missing = append(missing, i)
}
}
return missing
}
// sortEcShards sorts EC shards based on the specified field and order
func sortEcShards(shards []EcShardWithInfo, sortBy string, sortOrder string) {
sort.Slice(shards, func(i, j int) bool {
var less bool
switch sortBy {
case "shard_id":
less = shards[i].ShardID < shards[j].ShardID
case "server":
if shards[i].Server == shards[j].Server {
less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
} else {
less = shards[i].Server < shards[j].Server
}
case "data_center":
if shards[i].DataCenter == shards[j].DataCenter {
less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
} else {
less = shards[i].DataCenter < shards[j].DataCenter
}
case "rack":
if shards[i].Rack == shards[j].Rack {
less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
} else {
less = shards[i].Rack < shards[j].Rack
}
default:
less = shards[i].ShardID < shards[j].ShardID
}
if sortOrder == "desc" {
return !less
}
return less
})
}
// GetEcVolumeDetails retrieves detailed information about a specific EC volume
func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrder string) (*EcVolumeDetailsData, error) {
// Set defaults
if sortBy == "" {
sortBy = "shard_id"
}
if sortOrder == "" {
sortOrder = "asc"
}
var shards []EcShardWithInfo
var collection string
dataCenters := make(map[string]bool)
servers := make(map[string]bool)
// Get detailed EC shard information for the specific volume via gRPC
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo != nil {
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
// Process EC shard information for this specific volume
for _, ecShardInfo := range diskInfo.EcShardInfos {
if ecShardInfo.Id == volumeID {
collection = ecShardInfo.Collection
dataCenters[dc.Id] = true
servers[node.Id] = true
// Create individual shard entries for each shard this server has
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
ecShard := EcShardWithInfo{
VolumeID: ecShardInfo.Id,
ShardID: uint32(shardId),
Collection: ecShardInfo.Collection,
Size: 0, // EC shards don't have individual size in the API response
Server: node.Id,
DataCenter: dc.Id,
Rack: rack.Id,
DiskType: diskInfo.Type,
ModifiedTime: 0, // Not available in current API
EcIndexBits: ecShardInfo.EcIndexBits,
ShardCount: getShardCount(ecShardInfo.EcIndexBits),
Generation: ecShardInfo.Generation, // Include generation information
}
shards = append(shards, ecShard)
}
}
}
}
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
if len(shards) == 0 {
return nil, fmt.Errorf("EC volume %d not found", volumeID)
}
// Collect shard size information from volume servers
shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size
for _, shard := range shards {
server := shard.Server
if _, exists := shardSizeMap[server]; !exists {
// Query this server for shard sizes
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: volumeID,
})
if err != nil {
glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err)
return nil // Continue with other servers, don't fail the entire request
}
// Store shard sizes for this server
shardSizeMap[server] = make(map[uint32]uint64)
for _, shardInfo := range resp.EcShardInfos {
shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size)
}
return nil
})
if err != nil {
glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
}
}
}
// Update shard sizes in the shards array
for i := range shards {
server := shards[i].Server
shardId := shards[i].ShardID
if serverSizes, exists := shardSizeMap[server]; exists {
if size, exists := serverSizes[shardId]; exists {
shards[i].Size = size
}
}
}
// Calculate completeness based on unique shard IDs
foundShards := make(map[int]bool)
for _, shard := range shards {
foundShards[int(shard.ShardID)] = true
}
totalUniqueShards := len(foundShards)
isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount)
// Calculate missing shards
var missingShards []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if !foundShards[i] {
missingShards = append(missingShards, i)
}
}
// Update completeness info for each shard
for i := range shards {
shards[i].IsComplete = isComplete
shards[i].MissingShards = missingShards
}
// Sort shards based on parameters
sortEcShards(shards, sortBy, sortOrder)
// Convert maps to slices
var dcList []string
for dc := range dataCenters {
dcList = append(dcList, dc)
}
var serverList []string
for server := range servers {
serverList = append(serverList, server)
}
// Get EC volume health metrics (deletion information)
volumeHealth, err := s.getEcVolumeHealthMetrics(volumeID)
if err != nil {
glog.V(0).Infof("ERROR: Failed to get EC volume health metrics for volume %d: %v", volumeID, err)
// Don't fail the request, just use default values
volumeHealth = &EcVolumeHealthInfo{
TotalSize: 0,
DeletedByteCount: 0,
FileCount: 0,
DeleteCount: 0,
GarbageRatio: 0.0,
}
}
// Analyze generation information
generationMap := make(map[uint32]bool)
generationShards := make(map[uint32][]uint32)
generationComplete := make(map[uint32]bool)
// Collect all generations and group shards by generation
for _, shard := range shards {
generationMap[shard.Generation] = true
generationShards[shard.Generation] = append(generationShards[shard.Generation], shard.ShardID)
}
// Convert generation map to sorted slice
var generations []uint32
for gen := range generationMap {
generations = append(generations, gen)
}
// Sort generations (oldest first)
sort.Slice(generations, func(i, j int) bool {
return generations[i] < generations[j]
})
// Check completion status for each generation
for gen, shardIDs := range generationShards {
generationComplete[gen] = len(shardIDs) == erasure_coding.TotalShardsCount
}
// Get active generation from master
var activeGeneration uint32
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
// Use LookupEcVolume to get active generation
resp, lookupErr := client.LookupEcVolume(context.Background(), &master_pb.LookupEcVolumeRequest{
VolumeId: volumeID,
})
if lookupErr == nil && resp != nil {
activeGeneration = resp.ActiveGeneration
}
return nil // Don't fail if lookup fails, just use generation 0 as default
})
data := &EcVolumeDetailsData{
VolumeID: volumeID,
Collection: collection,
Shards: shards,
TotalShards: totalUniqueShards,
IsComplete: isComplete,
MissingShards: missingShards,
DataCenters: dcList,
Servers: serverList,
LastUpdated: time.Now(),
// Volume health metrics (for EC vacuum)
TotalSize: volumeHealth.TotalSize,
DeletedByteCount: volumeHealth.DeletedByteCount,
FileCount: volumeHealth.FileCount,
DeleteCount: volumeHealth.DeleteCount,
GarbageRatio: volumeHealth.GarbageRatio,
// Generation information
Generations: generations,
ActiveGeneration: activeGeneration,
GenerationShards: generationShards,
GenerationComplete: generationComplete,
SortBy: sortBy,
SortOrder: sortOrder,
}
return data, nil
}
// getEcVolumeHealthMetrics retrieves health metrics for an EC volume
func (s *AdminServer) getEcVolumeHealthMetrics(volumeID uint32) (*EcVolumeHealthInfo, error) {
glog.V(0).Infof("DEBUG: getEcVolumeHealthMetrics called for volume %d", volumeID)
// Get list of servers that have shards for this EC volume
var servers []string
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo != nil {
serverSet := make(map[string]struct{})
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
// Check if this node has EC shards for our volume
for _, ecShardInfo := range diskInfo.EcShardInfos {
if ecShardInfo.Id == volumeID {
serverSet[node.Id] = struct{}{}
}
}
}
}
}
}
for server := range serverSet {
servers = append(servers, server)
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to get topology info: %v", err)
}
glog.V(0).Infof("DEBUG: Found %d servers with EC shards for volume %d: %v", len(servers), volumeID, servers)
if len(servers) == 0 {
return nil, fmt.Errorf("no servers found with EC shards for volume %d", volumeID)
}
// Aggregate health metrics from ALL servers that have EC shards
var aggregatedHealth *EcVolumeHealthInfo
var totalSize uint64
var totalFileCount uint64
var totalDeletedBytes uint64
var totalDeletedCount uint64
validServers := 0
for _, server := range servers {
healthInfo, err := s.getVolumeHealthFromServer(server, volumeID)
if err != nil {
glog.V(2).Infof("Failed to get volume health from server %s for volume %d: %v", server, volumeID, err)
continue // Try next server
}
glog.V(0).Infof("DEBUG: getVolumeHealthFromServer returned for %s: healthInfo=%v", server, healthInfo != nil)
if healthInfo != nil {
// Sum the values across all servers (each server contributes its shard data)
totalSize += healthInfo.TotalSize
totalFileCount += healthInfo.FileCount
totalDeletedBytes += healthInfo.DeletedByteCount
totalDeletedCount += healthInfo.DeleteCount
validServers++
glog.V(0).Infof("DEBUG: Added server %s data: size=%d, files=%d, deleted_bytes=%d", server, healthInfo.TotalSize, healthInfo.FileCount, healthInfo.DeletedByteCount)
// Store first non-nil health info as template for aggregated result
if aggregatedHealth == nil {
aggregatedHealth = &EcVolumeHealthInfo{}
}
}
}
// If we got aggregated data, finalize it
glog.V(0).Infof("DEBUG: Aggregation check - aggregatedHealth=%v, validServers=%d", aggregatedHealth != nil, validServers)
if aggregatedHealth != nil && validServers > 0 {
// Use summed totals from all servers
aggregatedHealth.TotalSize = totalSize
aggregatedHealth.FileCount = totalFileCount
aggregatedHealth.DeletedByteCount = totalDeletedBytes
aggregatedHealth.DeleteCount = totalDeletedCount
// Calculate garbage ratio from aggregated data
if aggregatedHealth.TotalSize > 0 {
aggregatedHealth.GarbageRatio = float64(aggregatedHealth.DeletedByteCount) / float64(aggregatedHealth.TotalSize)
}
glog.V(0).Infof("SUCCESS: Aggregated EC volume %d from %d servers: %d total bytes -> %d MB",
volumeID, validServers, totalSize, totalSize/1024/1024)
return aggregatedHealth, nil
}
// If we can't get the original metrics, try to calculate from EC shards
return s.calculateHealthFromEcShards(volumeID, servers)
}
// getVolumeHealthFromServer gets volume health information from a specific server
func (s *AdminServer) getVolumeHealthFromServer(server string, volumeID uint32) (*EcVolumeHealthInfo, error) {
var healthInfo *EcVolumeHealthInfo
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
var collection string = "" // Default collection name
var totalSize uint64 = 0
var fileCount uint64 = 0
// Try to get volume file status (which may include original volume metrics)
// This will fail for EC-only volumes, so we handle that gracefully
resp, err := client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
glog.V(2).Infof("ReadVolumeFileStatus failed for EC volume %d on server %s (expected for EC-only volumes): %v", volumeID, server, err)
// For EC-only volumes, we don't have original volume metrics, but we can still get deletion info
} else if resp.VolumeInfo != nil {
// Extract metrics from regular volume info if available
totalSize = uint64(resp.VolumeInfo.DatFileSize)
fileCount = resp.FileCount
collection = resp.Collection
}
// Always try to get EC deletion information using the new gRPC endpoint
deletionResp, deletionErr := client.VolumeEcDeletionInfo(context.Background(), &volume_server_pb.VolumeEcDeletionInfoRequest{
VolumeId: volumeID,
Collection: collection,
Generation: 0, // Use default generation for backward compatibility
})
if deletionErr != nil {
glog.V(1).Infof("Failed to get EC deletion info for volume %d on server %s: %v", volumeID, server, deletionErr)
// If we have some info from ReadVolumeFileStatus, still create healthInfo with that
if totalSize > 0 {
healthInfo = &EcVolumeHealthInfo{
TotalSize: totalSize,
DeletedByteCount: 0,
FileCount: fileCount,
DeleteCount: 0,
GarbageRatio: 0.0,
}
}
} else if deletionResp != nil {
// Create health info with deletion data
healthInfo = &EcVolumeHealthInfo{
TotalSize: deletionResp.TotalSize, // Get total size from EC deletion info
DeletedByteCount: deletionResp.DeletedBytes,
FileCount: fileCount,
DeleteCount: deletionResp.DeletedCount,
GarbageRatio: 0.0,
}
// Calculate garbage ratio if we have total size
if healthInfo.TotalSize > 0 {
healthInfo.GarbageRatio = float64(healthInfo.DeletedByteCount) / float64(healthInfo.TotalSize)
}
glog.V(1).Infof("EC volume %d on server %s: %d deleted bytes, %d deleted needles, total size: %d bytes",
volumeID, server, healthInfo.DeletedByteCount, healthInfo.DeleteCount, healthInfo.TotalSize)
}
return nil // Return from WithVolumeServerClient callback - healthInfo is captured by closure
})
return healthInfo, err
}
// calculateHealthFromEcShards attempts to calculate health metrics from EC shard information
func (s *AdminServer) calculateHealthFromEcShards(volumeID uint32, servers []string) (*EcVolumeHealthInfo, error) {
var totalShardSize uint64
shardCount := 0
// Get shard sizes from all servers
for _, server := range servers {
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: volumeID,
})
if err != nil {
return err
}
for _, shardInfo := range resp.EcShardInfos {
totalShardSize += uint64(shardInfo.Size)
shardCount++
}
return nil
})
if err != nil {
glog.V(2).Infof("Failed to get EC shard info from server %s: %v", server, err)
}
}
if shardCount == 0 {
return nil, fmt.Errorf("no EC shard information found for volume %d", volumeID)
}
// For EC volumes, we can estimate the original size from the data shards
// EC uses 10 data shards + 4 parity shards = 14 total
// The original volume size is approximately the sum of the 10 data shards
dataShardCount := 10 // erasure_coding.DataShardsCount
estimatedOriginalSize := totalShardSize
if shardCount >= dataShardCount {
// If we have info from data shards, estimate better
avgShardSize := totalShardSize / uint64(shardCount)
estimatedOriginalSize = avgShardSize * uint64(dataShardCount)
}
return &EcVolumeHealthInfo{
TotalSize: estimatedOriginalSize,
DeletedByteCount: 0, // Cannot determine from EC shards alone
FileCount: 0, // Cannot determine from EC shards alone
DeleteCount: 0, // Cannot determine from EC shards alone
GarbageRatio: 0.0,
}, nil
}