|
|
|
@ -23,6 +23,7 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/security" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient" |
|
|
|
"google.golang.org/grpc" |
|
|
|
@ -96,6 +97,11 @@ type AdminServer struct { |
|
|
|
|
|
|
|
// Worker gRPC server
|
|
|
|
workerGrpcServer *WorkerGrpcServer |
|
|
|
|
|
|
|
// Collection statistics caching
|
|
|
|
collectionStatsCache map[string]collectionStats |
|
|
|
lastCollectionStatsUpdate time.Time |
|
|
|
collectionStatsCacheThreshold time.Duration |
|
|
|
} |
|
|
|
|
|
|
|
// Type definitions moved to types.go
|
|
|
|
@ -119,13 +125,14 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) |
|
|
|
go masterClient.KeepConnectedToMaster(ctx) |
|
|
|
|
|
|
|
server := &AdminServer{ |
|
|
|
masterClient: masterClient, |
|
|
|
templateFS: templateFS, |
|
|
|
dataDir: dataDir, |
|
|
|
grpcDialOption: grpcDialOption, |
|
|
|
cacheExpiration: 10 * time.Second, |
|
|
|
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
|
|
|
|
configPersistence: NewConfigPersistence(dataDir), |
|
|
|
masterClient: masterClient, |
|
|
|
templateFS: templateFS, |
|
|
|
dataDir: dataDir, |
|
|
|
grpcDialOption: grpcDialOption, |
|
|
|
cacheExpiration: 10 * time.Second, |
|
|
|
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
|
|
|
|
configPersistence: NewConfigPersistence(dataDir), |
|
|
|
collectionStatsCacheThreshold: 30 * time.Second, |
|
|
|
} |
|
|
|
|
|
|
|
// Initialize topic retention purger
|
|
|
|
@ -236,57 +243,32 @@ func (s *AdminServer) GetCredentialManager() *credential.CredentialManager { |
|
|
|
|
|
|
|
// InvalidateCache method moved to cluster_topology.go
|
|
|
|
|
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { |
|
|
|
var buckets []S3Bucket |
|
|
|
// GetS3BucketsData retrieves all Object Store buckets and aggregates total storage metrics
|
|
|
|
func (s *AdminServer) GetS3BucketsData() (S3BucketsData, error) { |
|
|
|
buckets, err := s.GetS3Buckets() |
|
|
|
if err != nil { |
|
|
|
return S3BucketsData{}, err |
|
|
|
} |
|
|
|
|
|
|
|
// Build a map of collection name to collection data
|
|
|
|
collectionMap := make(map[string]struct { |
|
|
|
Size int64 |
|
|
|
FileCount int64 |
|
|
|
}) |
|
|
|
var totalSize int64 |
|
|
|
for _, bucket := range buckets { |
|
|
|
totalSize += bucket.PhysicalSize |
|
|
|
} |
|
|
|
|
|
|
|
// Collect volume information by collection
|
|
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { |
|
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return S3BucketsData{ |
|
|
|
Buckets: buckets, |
|
|
|
TotalBuckets: len(buckets), |
|
|
|
TotalSize: totalSize, |
|
|
|
LastUpdated: time.Now(), |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
if resp.TopologyInfo != nil { |
|
|
|
for _, dc := range resp.TopologyInfo.DataCenterInfos { |
|
|
|
for _, rack := range dc.RackInfos { |
|
|
|
for _, node := range rack.DataNodeInfos { |
|
|
|
for _, diskInfo := range node.DiskInfos { |
|
|
|
for _, volInfo := range diskInfo.VolumeInfos { |
|
|
|
collection := volInfo.Collection |
|
|
|
if collection == "" { |
|
|
|
collection = "default" |
|
|
|
} |
|
|
|
|
|
|
|
if _, exists := collectionMap[collection]; !exists { |
|
|
|
collectionMap[collection] = struct { |
|
|
|
Size int64 |
|
|
|
FileCount int64 |
|
|
|
}{} |
|
|
|
} |
|
|
|
|
|
|
|
data := collectionMap[collection] |
|
|
|
data.Size += int64(volInfo.Size) |
|
|
|
data.FileCount += int64(volInfo.FileCount) |
|
|
|
collectionMap[collection] = data |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { |
|
|
|
var buckets []S3Bucket |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to get volume information: %w", err) |
|
|
|
} |
|
|
|
// Collect volume information by collection with caching
|
|
|
|
collectionMap, _ := s.getCollectionStats() |
|
|
|
|
|
|
|
// Get filer configuration (buckets path and filer group)
|
|
|
|
filerConfig, err := s.getFilerConfig() |
|
|
|
@ -324,10 +306,12 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { |
|
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName) |
|
|
|
|
|
|
|
// Get size and object count from collection data
|
|
|
|
var size int64 |
|
|
|
var physicalSize int64 |
|
|
|
var logicalSize int64 |
|
|
|
var objectCount int64 |
|
|
|
if collectionData, exists := collectionMap[collectionName]; exists { |
|
|
|
size = collectionData.Size |
|
|
|
physicalSize = collectionData.PhysicalSize |
|
|
|
logicalSize = collectionData.LogicalSize |
|
|
|
objectCount = collectionData.FileCount |
|
|
|
} |
|
|
|
|
|
|
|
@ -363,7 +347,8 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { |
|
|
|
bucket := S3Bucket{ |
|
|
|
Name: bucketName, |
|
|
|
CreatedAt: time.Unix(resp.Entry.Attributes.Crtime, 0), |
|
|
|
Size: size, |
|
|
|
LogicalSize: logicalSize, |
|
|
|
PhysicalSize: physicalSize, |
|
|
|
ObjectCount: objectCount, |
|
|
|
LastModified: time.Unix(resp.Entry.Attributes.Mtime, 0), |
|
|
|
Quota: quota, |
|
|
|
@ -389,6 +374,7 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// GetBucketDetails retrieves detailed information about a specific bucket
|
|
|
|
// Note: This no longer lists objects for performance reasons. Use GetS3Buckets for size/count data.
|
|
|
|
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) { |
|
|
|
// Get filer configuration (buckets path)
|
|
|
|
filerConfig, err := s.getFilerConfig() |
|
|
|
@ -396,16 +382,25 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error |
|
|
|
glog.Warningf("Failed to get filer configuration, using defaults: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
bucketPath := fmt.Sprintf("%s/%s", filerConfig.BucketsPath, bucketName) |
|
|
|
|
|
|
|
details := &BucketDetails{ |
|
|
|
Bucket: S3Bucket{ |
|
|
|
Name: bucketName, |
|
|
|
}, |
|
|
|
Objects: []S3Object{}, |
|
|
|
UpdatedAt: time.Now(), |
|
|
|
} |
|
|
|
|
|
|
|
// Get collection data for size and object count with caching
|
|
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName) |
|
|
|
stats, err := s.getCollectionStats() |
|
|
|
if err != nil { |
|
|
|
glog.Warningf("Failed to get collection data: %v", err) |
|
|
|
// Continue without collection data - use zero values
|
|
|
|
} else if data, ok := stats[collectionName]; ok { |
|
|
|
details.Bucket.LogicalSize = data.LogicalSize |
|
|
|
details.Bucket.PhysicalSize = data.PhysicalSize |
|
|
|
details.Bucket.ObjectCount = data.FileCount |
|
|
|
} |
|
|
|
|
|
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
// Get bucket info
|
|
|
|
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ |
|
|
|
@ -456,8 +451,7 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error |
|
|
|
details.Bucket.ObjectLockDuration = objectLockDuration |
|
|
|
details.Bucket.Owner = owner |
|
|
|
|
|
|
|
// List objects in bucket (recursively)
|
|
|
|
return s.listBucketObjects(client, bucketPath, bucketPath, "", details) |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
@ -467,106 +461,6 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error |
|
|
|
return details, nil |
|
|
|
} |
|
|
|
|
|
|
|
// listBucketObjects recursively lists all objects in a bucket
|
|
|
|
// bucketBasePath is the full path to the bucket (e.g., /buckets/mybucket)
|
|
|
|
func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, bucketBasePath, directory, prefix string, details *BucketDetails) error { |
|
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ |
|
|
|
Directory: directory, |
|
|
|
Prefix: prefix, |
|
|
|
StartFromFileName: "", |
|
|
|
InclusiveStartFrom: false, |
|
|
|
Limit: 1000, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
for { |
|
|
|
resp, err := stream.Recv() |
|
|
|
if err != nil { |
|
|
|
if err.Error() == "EOF" { |
|
|
|
break |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
entry := resp.Entry |
|
|
|
if entry.IsDirectory { |
|
|
|
// Check if this is a .versions directory (represents a versioned object)
|
|
|
|
if strings.HasSuffix(entry.Name, ".versions") { |
|
|
|
// This directory represents an object, add it as an object without the .versions suffix
|
|
|
|
objectName := strings.TrimSuffix(entry.Name, ".versions") |
|
|
|
objectKey := objectName |
|
|
|
if directory != bucketBasePath { |
|
|
|
relativePath := directory[len(bucketBasePath)+1:] |
|
|
|
objectKey = fmt.Sprintf("%s/%s", relativePath, objectName) |
|
|
|
} |
|
|
|
|
|
|
|
// Extract latest version metadata from extended attributes
|
|
|
|
var size int64 = 0 |
|
|
|
var mtime int64 = entry.Attributes.Mtime |
|
|
|
if entry.Extended != nil { |
|
|
|
// Get size of latest version
|
|
|
|
if sizeBytes, ok := entry.Extended[s3_constants.ExtLatestVersionSizeKey]; ok && len(sizeBytes) == 8 { |
|
|
|
size = int64(util.BytesToUint64(sizeBytes)) |
|
|
|
} |
|
|
|
// Get mtime of latest version
|
|
|
|
if mtimeBytes, ok := entry.Extended[s3_constants.ExtLatestVersionMtimeKey]; ok && len(mtimeBytes) == 8 { |
|
|
|
mtime = int64(util.BytesToUint64(mtimeBytes)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
obj := S3Object{ |
|
|
|
Key: objectKey, |
|
|
|
Size: size, |
|
|
|
LastModified: time.Unix(mtime, 0), |
|
|
|
ETag: "", |
|
|
|
StorageClass: "STANDARD", |
|
|
|
} |
|
|
|
|
|
|
|
details.Objects = append(details.Objects, obj) |
|
|
|
details.TotalCount++ |
|
|
|
details.TotalSize += size |
|
|
|
// Don't recurse into .versions directories
|
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// Recursively list subdirectories
|
|
|
|
subDir := fmt.Sprintf("%s/%s", directory, entry.Name) |
|
|
|
err := s.listBucketObjects(client, bucketBasePath, subDir, "", details) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} else { |
|
|
|
// Add file object
|
|
|
|
objectKey := entry.Name |
|
|
|
if directory != bucketBasePath { |
|
|
|
// Remove bucket prefix to get relative path
|
|
|
|
relativePath := directory[len(bucketBasePath)+1:] |
|
|
|
objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name) |
|
|
|
} |
|
|
|
|
|
|
|
obj := S3Object{ |
|
|
|
Key: objectKey, |
|
|
|
Size: int64(entry.Attributes.FileSize), |
|
|
|
LastModified: time.Unix(entry.Attributes.Mtime, 0), |
|
|
|
ETag: "", // Could be calculated from chunks if needed
|
|
|
|
StorageClass: "STANDARD", |
|
|
|
} |
|
|
|
|
|
|
|
details.Objects = append(details.Objects, obj) |
|
|
|
details.TotalSize += obj.Size |
|
|
|
details.TotalCount++ |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Update bucket totals
|
|
|
|
details.Bucket.Size = details.TotalSize |
|
|
|
details.Bucket.ObjectCount = details.TotalCount |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// CreateS3Bucket creates a new S3 bucket
|
|
|
|
func (s *AdminServer) CreateS3Bucket(bucketName string) error { |
|
|
|
return s.CreateS3BucketWithQuota(bucketName, 0, false) |
|
|
|
@ -2108,3 +2002,66 @@ func getBoolFromMap(m map[string]interface{}, key string) bool { |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
type collectionStats struct { |
|
|
|
PhysicalSize int64 |
|
|
|
LogicalSize int64 |
|
|
|
FileCount int64 |
|
|
|
} |
|
|
|
|
|
|
|
func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]collectionStats { |
|
|
|
collectionMap := make(map[string]collectionStats) |
|
|
|
for _, dc := range topologyInfo.DataCenterInfos { |
|
|
|
for _, rack := range dc.RackInfos { |
|
|
|
for _, node := range rack.DataNodeInfos { |
|
|
|
for _, diskInfo := range node.DiskInfos { |
|
|
|
for _, volInfo := range diskInfo.VolumeInfos { |
|
|
|
collection := volInfo.Collection |
|
|
|
if collection == "" { |
|
|
|
collection = "default" |
|
|
|
} |
|
|
|
|
|
|
|
data := collectionMap[collection] |
|
|
|
data.PhysicalSize += int64(volInfo.Size) |
|
|
|
rp, _ := super_block.NewReplicaPlacementFromByte(byte(volInfo.ReplicaPlacement)) |
|
|
|
// NewReplicaPlacementFromByte never returns a nil rp. If there's an error,
|
|
|
|
// it returns a zero-valued ReplicaPlacement, for which GetCopyCount() is 1.
|
|
|
|
// This provides a safe fallback, so we can ignore the error.
|
|
|
|
replicaCount := int64(rp.GetCopyCount()) |
|
|
|
if volInfo.Size >= volInfo.DeletedByteCount { |
|
|
|
data.LogicalSize += int64(volInfo.Size-volInfo.DeletedByteCount) / replicaCount |
|
|
|
} |
|
|
|
if volInfo.FileCount >= volInfo.DeleteCount { |
|
|
|
data.FileCount += int64(volInfo.FileCount-volInfo.DeleteCount) / replicaCount |
|
|
|
} |
|
|
|
collectionMap[collection] = data |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return collectionMap |
|
|
|
} |
|
|
|
|
|
|
|
// getCollectionStats returns current collection statistics with caching
|
|
|
|
func (s *AdminServer) getCollectionStats() (map[string]collectionStats, error) { |
|
|
|
now := time.Now() |
|
|
|
if s.collectionStatsCache != nil && now.Sub(s.lastCollectionStatsUpdate) < s.collectionStatsCacheThreshold { |
|
|
|
return s.collectionStatsCache, nil |
|
|
|
} |
|
|
|
|
|
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { |
|
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if resp.TopologyInfo != nil { |
|
|
|
s.collectionStatsCache = collectCollectionStats(resp.TopologyInfo) |
|
|
|
s.lastCollectionStatsUpdate = now |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
return s.collectionStatsCache, err |
|
|
|
} |