You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1480 lines
43 KiB
1480 lines
43 KiB
package dash
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
"github.com/seaweedfs/seaweedfs/weed/credential"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type AdminServer struct {
|
|
masterAddress string
|
|
templateFS http.FileSystem
|
|
dataDir string
|
|
grpcDialOption grpc.DialOption
|
|
cacheExpiration time.Duration
|
|
lastCacheUpdate time.Time
|
|
cachedTopology *ClusterTopology
|
|
|
|
// Filer discovery and caching
|
|
cachedFilers []string
|
|
lastFilerUpdate time.Time
|
|
filerCacheExpiration time.Duration
|
|
|
|
// Credential management
|
|
credentialManager *credential.CredentialManager
|
|
|
|
// Configuration persistence
|
|
configPersistence *ConfigPersistence
|
|
|
|
// Maintenance system
|
|
maintenanceManager *maintenance.MaintenanceManager
|
|
|
|
// Topic retention purger
|
|
topicRetentionPurger *TopicRetentionPurger
|
|
|
|
// Worker gRPC server
|
|
workerGrpcServer *WorkerGrpcServer
|
|
}
|
|
|
|
// Type definitions moved to types.go
|
|
|
|
func NewAdminServer(masterAddress string, templateFS http.FileSystem, dataDir string) *AdminServer {
|
|
server := &AdminServer{
|
|
masterAddress: masterAddress,
|
|
templateFS: templateFS,
|
|
dataDir: dataDir,
|
|
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
|
|
cacheExpiration: 10 * time.Second,
|
|
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
|
|
configPersistence: NewConfigPersistence(dataDir),
|
|
}
|
|
|
|
// Initialize topic retention purger
|
|
server.topicRetentionPurger = NewTopicRetentionPurger(server)
|
|
|
|
// Initialize credential manager with defaults
|
|
credentialManager, err := credential.NewCredentialManagerWithDefaults("")
|
|
if err != nil {
|
|
glog.Warningf("Failed to initialize credential manager: %v", err)
|
|
// Continue without credential manager - will fall back to legacy approach
|
|
} else {
|
|
// For stores that need filer client details, set them
|
|
if store := credentialManager.GetStore(); store != nil {
|
|
if filerClientSetter, ok := store.(interface {
|
|
SetFilerClient(string, grpc.DialOption)
|
|
}); ok {
|
|
// We'll set the filer client later when we discover filers
|
|
// For now, just store the credential manager
|
|
server.credentialManager = credentialManager
|
|
|
|
// Set up a goroutine to set filer client once we discover filers
|
|
go func() {
|
|
for {
|
|
filerAddr := server.GetFilerAddress()
|
|
if filerAddr != "" {
|
|
filerClientSetter.SetFilerClient(filerAddr, server.grpcDialOption)
|
|
glog.V(1).Infof("Set filer client for credential manager: %s", filerAddr)
|
|
break
|
|
}
|
|
glog.V(1).Infof("Waiting for filer discovery for credential manager...")
|
|
time.Sleep(5 * time.Second) // Retry every 5 seconds
|
|
}
|
|
}()
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
}
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
}
|
|
}
|
|
|
|
// Initialize maintenance system with persistent configuration
|
|
if server.configPersistence.IsConfigured() {
|
|
maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
}
|
|
server.InitMaintenanceManager(maintenanceConfig)
|
|
|
|
// Start maintenance manager if enabled
|
|
if maintenanceConfig.Enabled {
|
|
go func() {
|
|
if err := server.StartMaintenanceManager(); err != nil {
|
|
glog.Errorf("Failed to start maintenance manager: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
} else {
|
|
glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode")
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// GetCredentialManager returns the credential manager
|
|
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
|
|
return s.credentialManager
|
|
}
|
|
|
|
// Filer discovery methods moved to client_management.go
|
|
|
|
// Client management methods moved to client_management.go
|
|
|
|
// WithFilerClient and WithVolumeServerClient methods moved to client_management.go
|
|
|
|
// Cluster topology methods moved to cluster_topology.go
|
|
|
|
// getTopologyViaGRPC method moved to cluster_topology.go
|
|
|
|
// InvalidateCache method moved to cluster_topology.go
|
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
|
|
var buckets []S3Bucket
|
|
|
|
// Build a map of collection name to collection data
|
|
collectionMap := make(map[string]struct {
|
|
Size int64
|
|
FileCount int64
|
|
})
|
|
|
|
// Collect volume information by collection
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.TopologyInfo != nil {
|
|
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for _, diskInfo := range node.DiskInfos {
|
|
for _, volInfo := range diskInfo.VolumeInfos {
|
|
collection := volInfo.Collection
|
|
if collection == "" {
|
|
collection = "default"
|
|
}
|
|
|
|
if _, exists := collectionMap[collection]; !exists {
|
|
collectionMap[collection] = struct {
|
|
Size int64
|
|
FileCount int64
|
|
}{}
|
|
}
|
|
|
|
data := collectionMap[collection]
|
|
data.Size += int64(volInfo.Size)
|
|
data.FileCount += int64(volInfo.FileCount)
|
|
collectionMap[collection] = data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get volume information: %v", err)
|
|
}
|
|
|
|
// Get filer configuration to determine FilerGroup
|
|
var filerGroup string
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
configResp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration: %v", err)
|
|
// Continue without filer group
|
|
return nil
|
|
}
|
|
filerGroup = configResp.FilerGroup
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer configuration: %v", err)
|
|
}
|
|
|
|
// Now list buckets from the filer and match with collection data
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// List buckets by looking at the /buckets directory
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: "/buckets",
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
if resp.Entry.IsDirectory {
|
|
bucketName := resp.Entry.Name
|
|
|
|
// Determine collection name for this bucket
|
|
var collectionName string
|
|
if filerGroup != "" {
|
|
collectionName = fmt.Sprintf("%s_%s", filerGroup, bucketName)
|
|
} else {
|
|
collectionName = bucketName
|
|
}
|
|
|
|
// Get size and object count from collection data
|
|
var size int64
|
|
var objectCount int64
|
|
if collectionData, exists := collectionMap[collectionName]; exists {
|
|
size = collectionData.Size
|
|
objectCount = collectionData.FileCount
|
|
}
|
|
|
|
// Get quota information from entry
|
|
quota := resp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
|
|
// Get versioning and object lock information from extended attributes
|
|
versioningEnabled := false
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
|
|
if resp.Entry.Extended != nil {
|
|
if versioningBytes, exists := resp.Entry.Extended["s3.versioning"]; exists {
|
|
versioningEnabled = string(versioningBytes) == "Enabled"
|
|
}
|
|
if objectLockBytes, exists := resp.Entry.Extended["s3.objectlock"]; exists {
|
|
objectLockEnabled = string(objectLockBytes) == "Enabled"
|
|
}
|
|
if objectLockModeBytes, exists := resp.Entry.Extended["s3.objectlock.mode"]; exists {
|
|
objectLockMode = string(objectLockModeBytes)
|
|
}
|
|
if objectLockDurationBytes, exists := resp.Entry.Extended["s3.objectlock.duration"]; exists {
|
|
if duration, err := strconv.ParseInt(string(objectLockDurationBytes), 10, 32); err == nil {
|
|
objectLockDuration = int32(duration)
|
|
}
|
|
}
|
|
}
|
|
|
|
bucket := S3Bucket{
|
|
Name: bucketName,
|
|
CreatedAt: time.Unix(resp.Entry.Attributes.Crtime, 0),
|
|
Size: size,
|
|
ObjectCount: objectCount,
|
|
LastModified: time.Unix(resp.Entry.Attributes.Mtime, 0),
|
|
Quota: quota,
|
|
QuotaEnabled: quotaEnabled,
|
|
VersioningEnabled: versioningEnabled,
|
|
ObjectLockEnabled: objectLockEnabled,
|
|
ObjectLockMode: objectLockMode,
|
|
ObjectLockDuration: objectLockDuration,
|
|
}
|
|
buckets = append(buckets, bucket)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list Object Store buckets: %v", err)
|
|
}
|
|
|
|
return buckets, nil
|
|
}
|
|
|
|
// GetBucketDetails retrieves detailed information about a specific bucket
|
|
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
|
|
bucketPath := fmt.Sprintf("/buckets/%s", bucketName)
|
|
|
|
details := &BucketDetails{
|
|
Bucket: S3Bucket{
|
|
Name: bucketName,
|
|
},
|
|
Objects: []S3Object{},
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get bucket info
|
|
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: "/buckets",
|
|
Name: bucketName,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("bucket not found: %v", err)
|
|
}
|
|
|
|
details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0)
|
|
details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0)
|
|
|
|
// Get quota information from entry
|
|
quota := bucketResp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
details.Bucket.Quota = quota
|
|
details.Bucket.QuotaEnabled = quotaEnabled
|
|
|
|
// Get versioning and object lock information from extended attributes
|
|
versioningEnabled := false
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
|
|
if bucketResp.Entry.Extended != nil {
|
|
if versioningBytes, exists := bucketResp.Entry.Extended["s3.versioning"]; exists {
|
|
versioningEnabled = string(versioningBytes) == "Enabled"
|
|
}
|
|
if objectLockBytes, exists := bucketResp.Entry.Extended["s3.objectlock"]; exists {
|
|
objectLockEnabled = string(objectLockBytes) == "Enabled"
|
|
}
|
|
if objectLockModeBytes, exists := bucketResp.Entry.Extended["s3.objectlock.mode"]; exists {
|
|
objectLockMode = string(objectLockModeBytes)
|
|
}
|
|
if objectLockDurationBytes, exists := bucketResp.Entry.Extended["s3.objectlock.duration"]; exists {
|
|
if duration, err := strconv.ParseInt(string(objectLockDurationBytes), 10, 32); err == nil {
|
|
objectLockDuration = int32(duration)
|
|
}
|
|
}
|
|
}
|
|
|
|
details.Bucket.VersioningEnabled = versioningEnabled
|
|
details.Bucket.ObjectLockEnabled = objectLockEnabled
|
|
details.Bucket.ObjectLockMode = objectLockMode
|
|
details.Bucket.ObjectLockDuration = objectLockDuration
|
|
|
|
// List objects in bucket (recursively)
|
|
return s.listBucketObjects(client, bucketPath, "", details)
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return details, nil
|
|
}
|
|
|
|
// listBucketObjects recursively lists all objects in a bucket
|
|
func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, directory, prefix string, details *BucketDetails) error {
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: directory,
|
|
Prefix: prefix,
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
entry := resp.Entry
|
|
if entry.IsDirectory {
|
|
// Recursively list subdirectories
|
|
subDir := fmt.Sprintf("%s/%s", directory, entry.Name)
|
|
err := s.listBucketObjects(client, subDir, "", details)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// Add file object
|
|
objectKey := entry.Name
|
|
if directory != fmt.Sprintf("/buckets/%s", details.Bucket.Name) {
|
|
// Remove bucket prefix to get relative path
|
|
relativePath := directory[len(fmt.Sprintf("/buckets/%s", details.Bucket.Name))+1:]
|
|
objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name)
|
|
}
|
|
|
|
obj := S3Object{
|
|
Key: objectKey,
|
|
Size: int64(entry.Attributes.FileSize),
|
|
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
|
ETag: "", // Could be calculated from chunks if needed
|
|
StorageClass: "STANDARD",
|
|
}
|
|
|
|
details.Objects = append(details.Objects, obj)
|
|
details.TotalSize += obj.Size
|
|
details.TotalCount++
|
|
}
|
|
}
|
|
|
|
// Update bucket totals
|
|
details.Bucket.Size = details.TotalSize
|
|
details.Bucket.ObjectCount = details.TotalCount
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateS3Bucket creates a new S3 bucket
|
|
func (s *AdminServer) CreateS3Bucket(bucketName string) error {
|
|
return s.CreateS3BucketWithQuota(bucketName, 0, false)
|
|
}
|
|
|
|
// DeleteS3Bucket deletes an S3 bucket and all its contents
|
|
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
|
|
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Delete bucket directory recursively
|
|
_, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
|
|
Directory: "/buckets",
|
|
Name: bucketName,
|
|
IsDeleteData: true,
|
|
IsRecursive: true,
|
|
IgnoreRecursiveError: false,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete bucket: %v", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetObjectStoreUsers retrieves object store users from identity.json
|
|
func (s *AdminServer) GetObjectStoreUsers() ([]ObjectStoreUser, error) {
|
|
s3cfg := &iam_pb.S3ApiConfiguration{}
|
|
|
|
// Load IAM configuration from filer
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
var buf bytes.Buffer
|
|
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
// If file doesn't exist, return empty configuration
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if buf.Len() > 0 {
|
|
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
glog.Errorf("Failed to load IAM configuration: %v", err)
|
|
return []ObjectStoreUser{}, nil // Return empty list instead of error for UI
|
|
}
|
|
|
|
var users []ObjectStoreUser
|
|
|
|
// Convert IAM identities to ObjectStoreUser format
|
|
for _, identity := range s3cfg.Identities {
|
|
// Skip anonymous identity
|
|
if identity.Name == "anonymous" {
|
|
continue
|
|
}
|
|
|
|
user := ObjectStoreUser{
|
|
Username: identity.Name,
|
|
Permissions: identity.Actions,
|
|
}
|
|
|
|
// Set email from account if available
|
|
if identity.Account != nil {
|
|
user.Email = identity.Account.EmailAddress
|
|
}
|
|
|
|
// Get first access key for display
|
|
if len(identity.Credentials) > 0 {
|
|
user.AccessKey = identity.Credentials[0].AccessKey
|
|
user.SecretKey = identity.Credentials[0].SecretKey
|
|
}
|
|
|
|
users = append(users, user)
|
|
}
|
|
|
|
return users, nil
|
|
}
|
|
|
|
// Volume server methods moved to volume_management.go
|
|
|
|
// Volume methods moved to volume_management.go
|
|
|
|
// sortVolumes method moved to volume_management.go
|
|
|
|
// GetClusterCollections method moved to collection_management.go
|
|
|
|
// GetClusterMasters retrieves cluster masters data
|
|
func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
|
|
var masters []MasterInfo
|
|
var leaderCount int
|
|
|
|
// First, get master information from topology
|
|
topology, err := s.GetClusterTopology()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a map to merge topology and raft data
|
|
masterMap := make(map[string]*MasterInfo)
|
|
|
|
// Add masters from topology
|
|
for _, master := range topology.Masters {
|
|
masterInfo := &MasterInfo{
|
|
Address: master.Address,
|
|
IsLeader: master.IsLeader,
|
|
Suffrage: "",
|
|
}
|
|
|
|
if master.IsLeader {
|
|
leaderCount++
|
|
}
|
|
|
|
masterMap[master.Address] = masterInfo
|
|
}
|
|
|
|
// Then, get additional master information from Raft cluster
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each raft server
|
|
for _, server := range resp.ClusterServers {
|
|
address := server.Address
|
|
|
|
// Update existing master info or create new one
|
|
if masterInfo, exists := masterMap[address]; exists {
|
|
// Update existing master with raft data
|
|
masterInfo.IsLeader = server.IsLeader
|
|
masterInfo.Suffrage = server.Suffrage
|
|
} else {
|
|
// Create new master info from raft data
|
|
masterInfo := &MasterInfo{
|
|
Address: address,
|
|
IsLeader: server.IsLeader,
|
|
Suffrage: server.Suffrage,
|
|
}
|
|
masterMap[address] = masterInfo
|
|
}
|
|
|
|
if server.IsLeader {
|
|
// Update leader count based on raft data
|
|
leaderCount = 1 // There should only be one leader
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
// If gRPC call fails, log the error but continue with topology data
|
|
glog.Errorf("Failed to get raft cluster servers from master %s: %v", s.masterAddress, err)
|
|
}
|
|
|
|
// Convert map to slice
|
|
for _, masterInfo := range masterMap {
|
|
masters = append(masters, *masterInfo)
|
|
}
|
|
|
|
// If no masters found at all, add the configured master as fallback
|
|
if len(masters) == 0 {
|
|
masters = append(masters, MasterInfo{
|
|
Address: s.masterAddress,
|
|
IsLeader: true,
|
|
Suffrage: "Voter",
|
|
})
|
|
leaderCount = 1
|
|
}
|
|
|
|
return &ClusterMastersData{
|
|
Masters: masters,
|
|
TotalMasters: len(masters),
|
|
LeaderCount: leaderCount,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterFilers retrieves cluster filers data
|
|
func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
|
|
var filers []FilerInfo
|
|
|
|
// Get filer information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.FilerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each filer node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
filerInfo := FilerInfo{
|
|
Address: node.Address,
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
filers = append(filers, filerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer nodes from master: %v", err)
|
|
}
|
|
|
|
return &ClusterFilersData{
|
|
Filers: filers,
|
|
TotalFilers: len(filers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterBrokers retrieves cluster message brokers data
|
|
func (s *AdminServer) GetClusterBrokers() (*ClusterBrokersData, error) {
|
|
var brokers []MessageBrokerInfo
|
|
|
|
// Get broker information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each broker node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
brokerInfo := MessageBrokerInfo{
|
|
Address: node.Address,
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
brokers = append(brokers, brokerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get broker nodes from master: %v", err)
|
|
}
|
|
|
|
return &ClusterBrokersData{
|
|
Brokers: brokers,
|
|
TotalBrokers: len(brokers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetAllFilers method moved to client_management.go
|
|
|
|
// GetVolumeDetails method moved to volume_management.go
|
|
|
|
// VacuumVolume method moved to volume_management.go
|
|
|
|
// ShowMaintenanceQueue displays the maintenance queue page
|
|
func (as *AdminServer) ShowMaintenanceQueue(c *gin.Context) {
|
|
data, err := as.getMaintenanceQueueData()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// This should not render HTML template, it should use the component approach
|
|
c.JSON(http.StatusOK, data)
|
|
}
|
|
|
|
// ShowMaintenanceWorkers displays the maintenance workers page
|
|
func (as *AdminServer) ShowMaintenanceWorkers(c *gin.Context) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// Create worker details data
|
|
workersData := make([]*WorkerDetailsData, 0, len(workers))
|
|
for _, worker := range workers {
|
|
details, err := as.getMaintenanceWorkerDetails(worker.ID)
|
|
if err != nil {
|
|
// Create basic worker details if we can't get full details
|
|
details = &WorkerDetailsData{
|
|
Worker: worker,
|
|
CurrentTasks: []*MaintenanceTask{},
|
|
RecentTasks: []*MaintenanceTask{},
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: 0,
|
|
TasksFailed: 0,
|
|
AverageTaskTime: 0,
|
|
Uptime: 0,
|
|
SuccessRate: 0,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}
|
|
}
|
|
workersData = append(workersData, details)
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"workers": workersData,
|
|
"title": "Maintenance Workers",
|
|
})
|
|
}
|
|
|
|
// ShowMaintenanceConfig displays the maintenance configuration page
|
|
func (as *AdminServer) ShowMaintenanceConfig(c *gin.Context) {
|
|
config, err := as.getMaintenanceConfig()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// This should not render HTML template, it should use the component approach
|
|
c.JSON(http.StatusOK, config)
|
|
}
|
|
|
|
// UpdateMaintenanceConfig updates maintenance configuration from form
|
|
func (as *AdminServer) UpdateMaintenanceConfig(c *gin.Context) {
|
|
var config MaintenanceConfig
|
|
if err := c.ShouldBind(&config); err != nil {
|
|
c.HTML(http.StatusBadRequest, "error.html", gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
err := as.updateMaintenanceConfig(&config)
|
|
if err != nil {
|
|
c.HTML(http.StatusInternalServerError, "error.html", gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.Redirect(http.StatusSeeOther, "/maintenance/config")
|
|
}
|
|
|
|
// TriggerMaintenanceScan triggers a maintenance scan
|
|
func (as *AdminServer) TriggerMaintenanceScan(c *gin.Context) {
|
|
err := as.triggerMaintenanceScan()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Maintenance scan triggered"})
|
|
}
|
|
|
|
// GetMaintenanceTasks returns all maintenance tasks
|
|
func (as *AdminServer) GetMaintenanceTasks(c *gin.Context) {
|
|
tasks, err := as.getMaintenanceTasks()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, tasks)
|
|
}
|
|
|
|
// GetMaintenanceTask returns a specific maintenance task
|
|
func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
|
|
taskID := c.Param("id")
|
|
task, err := as.getMaintenanceTask(taskID)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "Task not found"})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, task)
|
|
}
|
|
|
|
// CancelMaintenanceTask cancels a pending maintenance task
|
|
func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
|
|
taskID := c.Param("id")
|
|
err := as.cancelMaintenanceTask(taskID)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
|
|
}
|
|
|
|
// GetMaintenanceWorkersAPI returns all maintenance workers
|
|
func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, workers)
|
|
}
|
|
|
|
// GetMaintenanceWorker returns a specific maintenance worker
|
|
func (as *AdminServer) GetMaintenanceWorker(c *gin.Context) {
|
|
workerID := c.Param("id")
|
|
worker, err := as.getMaintenanceWorkerDetails(workerID)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "Worker not found"})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, worker)
|
|
}
|
|
|
|
// GetMaintenanceStats returns maintenance statistics
|
|
func (as *AdminServer) GetMaintenanceStats(c *gin.Context) {
|
|
stats, err := as.getMaintenanceStats()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, stats)
|
|
}
|
|
|
|
// GetMaintenanceConfigAPI returns maintenance configuration
|
|
func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
|
|
config, err := as.getMaintenanceConfig()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, config)
|
|
}
|
|
|
|
// UpdateMaintenanceConfigAPI updates maintenance configuration via API
|
|
func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
|
|
var config MaintenanceConfig
|
|
if err := c.ShouldBindJSON(&config); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
err := as.updateMaintenanceConfig(&config)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Configuration updated"})
|
|
}
|
|
|
|
// GetMaintenanceConfigData returns maintenance configuration data (public wrapper)
|
|
func (as *AdminServer) GetMaintenanceConfigData() (*maintenance.MaintenanceConfigData, error) {
|
|
return as.getMaintenanceConfig()
|
|
}
|
|
|
|
// UpdateMaintenanceConfigData updates maintenance configuration (public wrapper)
|
|
func (as *AdminServer) UpdateMaintenanceConfigData(config *maintenance.MaintenanceConfig) error {
|
|
return as.updateMaintenanceConfig(config)
|
|
}
|
|
|
|
// Helper methods for maintenance operations
|
|
|
|
// getMaintenanceQueueData returns data for the maintenance queue UI
|
|
func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) {
|
|
tasks, err := as.getMaintenanceTasks()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats, err := as.getMaintenanceQueueStats()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &maintenance.MaintenanceQueueData{
|
|
Tasks: tasks,
|
|
Workers: workers,
|
|
Stats: stats,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// getMaintenanceQueueStats returns statistics for the maintenance queue
|
|
func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
|
|
// This would integrate with the maintenance queue to get real statistics
|
|
// For now, return mock data
|
|
return &maintenance.QueueStats{
|
|
PendingTasks: 5,
|
|
RunningTasks: 2,
|
|
CompletedToday: 15,
|
|
FailedToday: 1,
|
|
TotalTasks: 23,
|
|
}, nil
|
|
}
|
|
|
|
// getMaintenanceTasks returns all maintenance tasks
|
|
func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
|
|
if as.maintenanceManager == nil {
|
|
return []*MaintenanceTask{}, nil
|
|
}
|
|
return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil
|
|
}
|
|
|
|
// getMaintenanceTask returns a specific maintenance task
|
|
func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) {
|
|
if as.maintenanceManager == nil {
|
|
return nil, fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
// Search for the task across all statuses since we don't know which status it has
|
|
statuses := []MaintenanceTaskStatus{
|
|
TaskStatusPending,
|
|
TaskStatusAssigned,
|
|
TaskStatusInProgress,
|
|
TaskStatusCompleted,
|
|
TaskStatusFailed,
|
|
TaskStatusCancelled,
|
|
}
|
|
|
|
for _, status := range statuses {
|
|
tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
|
|
for _, task := range tasks {
|
|
if task.ID == taskID {
|
|
return task, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
// cancelMaintenanceTask cancels a pending maintenance task
|
|
func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
|
|
if as.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
return as.maintenanceManager.CancelTask(taskID)
|
|
}
|
|
|
|
// getMaintenanceWorkers returns all maintenance workers
|
|
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
|
|
if as.maintenanceManager == nil {
|
|
return []*MaintenanceWorker{}, nil
|
|
}
|
|
return as.maintenanceManager.GetWorkers(), nil
|
|
}
|
|
|
|
// getMaintenanceWorkerDetails returns detailed information about a worker
|
|
func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDetailsData, error) {
|
|
if as.maintenanceManager == nil {
|
|
return nil, fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
workers := as.maintenanceManager.GetWorkers()
|
|
var targetWorker *MaintenanceWorker
|
|
for _, worker := range workers {
|
|
if worker.ID == workerID {
|
|
targetWorker = worker
|
|
break
|
|
}
|
|
}
|
|
|
|
if targetWorker == nil {
|
|
return nil, fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
// Get current tasks for this worker
|
|
currentTasks := as.maintenanceManager.GetTasks(TaskStatusInProgress, "", 0)
|
|
var workerCurrentTasks []*MaintenanceTask
|
|
for _, task := range currentTasks {
|
|
if task.WorkerID == workerID {
|
|
workerCurrentTasks = append(workerCurrentTasks, task)
|
|
}
|
|
}
|
|
|
|
// Get recent tasks for this worker
|
|
recentTasks := as.maintenanceManager.GetTasks(TaskStatusCompleted, "", 10)
|
|
var workerRecentTasks []*MaintenanceTask
|
|
for _, task := range recentTasks {
|
|
if task.WorkerID == workerID {
|
|
workerRecentTasks = append(workerRecentTasks, task)
|
|
}
|
|
}
|
|
|
|
// Calculate performance metrics
|
|
var totalDuration time.Duration
|
|
var completedTasks, failedTasks int
|
|
for _, task := range workerRecentTasks {
|
|
if task.Status == TaskStatusCompleted {
|
|
completedTasks++
|
|
if task.StartedAt != nil && task.CompletedAt != nil {
|
|
totalDuration += task.CompletedAt.Sub(*task.StartedAt)
|
|
}
|
|
} else if task.Status == TaskStatusFailed {
|
|
failedTasks++
|
|
}
|
|
}
|
|
|
|
var averageTaskTime time.Duration
|
|
var successRate float64
|
|
if completedTasks+failedTasks > 0 {
|
|
if completedTasks > 0 {
|
|
averageTaskTime = totalDuration / time.Duration(completedTasks)
|
|
}
|
|
successRate = float64(completedTasks) / float64(completedTasks+failedTasks) * 100
|
|
}
|
|
|
|
return &WorkerDetailsData{
|
|
Worker: targetWorker,
|
|
CurrentTasks: workerCurrentTasks,
|
|
RecentTasks: workerRecentTasks,
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: completedTasks,
|
|
TasksFailed: failedTasks,
|
|
AverageTaskTime: averageTaskTime,
|
|
Uptime: time.Since(targetWorker.LastHeartbeat), // This should be tracked properly
|
|
SuccessRate: successRate,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// getMaintenanceStats returns maintenance statistics
|
|
func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
|
|
if as.maintenanceManager == nil {
|
|
return &MaintenanceStats{
|
|
TotalTasks: 0,
|
|
TasksByStatus: make(map[MaintenanceTaskStatus]int),
|
|
TasksByType: make(map[MaintenanceTaskType]int),
|
|
ActiveWorkers: 0,
|
|
}, nil
|
|
}
|
|
return as.maintenanceManager.GetStats(), nil
|
|
}
|
|
|
|
// getMaintenanceConfig returns maintenance configuration
|
|
func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) {
|
|
// Load configuration from persistent storage
|
|
config, err := as.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
// Fallback to default configuration
|
|
config = DefaultMaintenanceConfig()
|
|
}
|
|
|
|
// Get system stats from maintenance manager if available
|
|
var systemStats *MaintenanceStats
|
|
if as.maintenanceManager != nil {
|
|
systemStats = as.maintenanceManager.GetStats()
|
|
} else {
|
|
// Fallback stats
|
|
systemStats = &MaintenanceStats{
|
|
TotalTasks: 0,
|
|
TasksByStatus: map[MaintenanceTaskStatus]int{
|
|
TaskStatusPending: 0,
|
|
TaskStatusInProgress: 0,
|
|
TaskStatusCompleted: 0,
|
|
TaskStatusFailed: 0,
|
|
},
|
|
TasksByType: make(map[MaintenanceTaskType]int),
|
|
ActiveWorkers: 0,
|
|
CompletedToday: 0,
|
|
FailedToday: 0,
|
|
AverageTaskTime: 0,
|
|
LastScanTime: time.Now().Add(-time.Hour),
|
|
NextScanTime: time.Now().Add(time.Duration(config.ScanIntervalSeconds) * time.Second),
|
|
}
|
|
}
|
|
|
|
return &MaintenanceConfigData{
|
|
Config: config,
|
|
IsEnabled: config.Enabled,
|
|
LastScanTime: systemStats.LastScanTime,
|
|
NextScanTime: systemStats.NextScanTime,
|
|
SystemStats: systemStats,
|
|
MenuItems: maintenance.BuildMaintenanceMenuItems(),
|
|
}, nil
|
|
}
|
|
|
|
// updateMaintenanceConfig updates maintenance configuration
|
|
func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
|
|
// Save configuration to persistent storage
|
|
if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
|
|
return fmt.Errorf("failed to save maintenance configuration: %v", err)
|
|
}
|
|
|
|
// Update maintenance manager if available
|
|
if as.maintenanceManager != nil {
|
|
if err := as.maintenanceManager.UpdateConfig(config); err != nil {
|
|
glog.Errorf("Failed to update maintenance manager config: %v", err)
|
|
// Don't return error here, just log it
|
|
}
|
|
}
|
|
|
|
glog.V(1).Infof("Updated maintenance configuration (enabled: %v, scan interval: %ds)",
|
|
config.Enabled, config.ScanIntervalSeconds)
|
|
return nil
|
|
}
|
|
|
|
// triggerMaintenanceScan triggers a maintenance scan
|
|
func (as *AdminServer) triggerMaintenanceScan() error {
|
|
if as.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
return as.maintenanceManager.TriggerScan()
|
|
}
|
|
|
|
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
|
|
func (as *AdminServer) TriggerTopicRetentionPurgeAPI(c *gin.Context) {
|
|
err := as.TriggerTopicRetentionPurge()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"message": "Topic retention purge triggered successfully"})
|
|
}
|
|
|
|
// GetConfigInfo returns information about the admin configuration
|
|
func (as *AdminServer) GetConfigInfo(c *gin.Context) {
|
|
configInfo := as.configPersistence.GetConfigInfo()
|
|
|
|
// Add additional admin server info
|
|
configInfo["master_address"] = as.masterAddress
|
|
configInfo["cache_expiration"] = as.cacheExpiration.String()
|
|
configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
|
|
|
|
// Add maintenance system info
|
|
if as.maintenanceManager != nil {
|
|
configInfo["maintenance_enabled"] = true
|
|
configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
|
|
} else {
|
|
configInfo["maintenance_enabled"] = false
|
|
configInfo["maintenance_running"] = false
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"config_info": configInfo,
|
|
"title": "Configuration Information",
|
|
})
|
|
}
|
|
|
|
// GetMaintenanceWorkersData returns workers data for the maintenance workers page
|
|
func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, error) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create worker details data
|
|
workersData := make([]*WorkerDetailsData, 0, len(workers))
|
|
activeWorkers := 0
|
|
busyWorkers := 0
|
|
totalLoad := 0
|
|
|
|
for _, worker := range workers {
|
|
details, err := as.getMaintenanceWorkerDetails(worker.ID)
|
|
if err != nil {
|
|
// Create basic worker details if we can't get full details
|
|
details = &WorkerDetailsData{
|
|
Worker: worker,
|
|
CurrentTasks: []*MaintenanceTask{},
|
|
RecentTasks: []*MaintenanceTask{},
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: 0,
|
|
TasksFailed: 0,
|
|
AverageTaskTime: 0,
|
|
Uptime: 0,
|
|
SuccessRate: 0,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}
|
|
}
|
|
workersData = append(workersData, details)
|
|
|
|
if worker.Status == "active" {
|
|
activeWorkers++
|
|
} else if worker.Status == "busy" {
|
|
busyWorkers++
|
|
}
|
|
totalLoad += worker.CurrentLoad
|
|
}
|
|
|
|
return &MaintenanceWorkersData{
|
|
Workers: workersData,
|
|
ActiveWorkers: activeWorkers,
|
|
BusyWorkers: busyWorkers,
|
|
TotalLoad: totalLoad,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// StartWorkerGrpcServer starts the worker gRPC server
|
|
func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error {
|
|
if s.workerGrpcServer != nil {
|
|
return fmt.Errorf("worker gRPC server is already running")
|
|
}
|
|
|
|
// Calculate gRPC port (HTTP port + 10000)
|
|
grpcPort := httpPort + 10000
|
|
|
|
s.workerGrpcServer = NewWorkerGrpcServer(s)
|
|
return s.workerGrpcServer.StartWithTLS(grpcPort)
|
|
}
|
|
|
|
// StopWorkerGrpcServer stops the worker gRPC server
|
|
func (s *AdminServer) StopWorkerGrpcServer() error {
|
|
if s.workerGrpcServer != nil {
|
|
err := s.workerGrpcServer.Stop()
|
|
s.workerGrpcServer = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetWorkerGrpcServer returns the worker gRPC server
|
|
func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
|
|
return s.workerGrpcServer
|
|
}
|
|
|
|
// Maintenance system integration methods
|
|
|
|
// InitMaintenanceManager initializes the maintenance manager
|
|
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
|
|
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
|
|
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
|
|
}
|
|
|
|
// GetMaintenanceManager returns the maintenance manager
|
|
func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
|
|
return s.maintenanceManager
|
|
}
|
|
|
|
// StartMaintenanceManager starts the maintenance manager
|
|
func (s *AdminServer) StartMaintenanceManager() error {
|
|
if s.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
return s.maintenanceManager.Start()
|
|
}
|
|
|
|
// StopMaintenanceManager stops the maintenance manager
|
|
func (s *AdminServer) StopMaintenanceManager() {
|
|
if s.maintenanceManager != nil {
|
|
s.maintenanceManager.Stop()
|
|
}
|
|
}
|
|
|
|
// TriggerTopicRetentionPurge triggers topic data purging based on retention policies
|
|
func (s *AdminServer) TriggerTopicRetentionPurge() error {
|
|
if s.topicRetentionPurger == nil {
|
|
return fmt.Errorf("topic retention purger not initialized")
|
|
}
|
|
|
|
glog.V(0).Infof("Triggering topic retention purge")
|
|
return s.topicRetentionPurger.PurgeExpiredTopicData()
|
|
}
|
|
|
|
// GetTopicRetentionPurger returns the topic retention purger
|
|
func (s *AdminServer) GetTopicRetentionPurger() *TopicRetentionPurger {
|
|
return s.topicRetentionPurger
|
|
}
|
|
|
|
// CreateTopicWithRetention creates a new topic with optional retention configuration
|
|
func (s *AdminServer) CreateTopicWithRetention(namespace, name string, partitionCount int32, retentionEnabled bool, retentionSeconds int64) error {
|
|
// Find broker leader to create the topic
|
|
brokerLeader, err := s.findBrokerLeader()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find broker leader: %v", err)
|
|
}
|
|
|
|
// Create retention configuration
|
|
var retention *mq_pb.TopicRetention
|
|
if retentionEnabled {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: true,
|
|
RetentionSeconds: retentionSeconds,
|
|
}
|
|
} else {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: false,
|
|
RetentionSeconds: 0,
|
|
}
|
|
}
|
|
|
|
// Create the topic via broker
|
|
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
PartitionCount: partitionCount,
|
|
Retention: retention,
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create topic: %v", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Created topic %s.%s with %d partitions (retention: enabled=%v, seconds=%d)",
|
|
namespace, name, partitionCount, retentionEnabled, retentionSeconds)
|
|
return nil
|
|
}
|
|
|
|
// UpdateTopicRetention updates the retention configuration for an existing topic
|
|
func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, retentionSeconds int64) error {
|
|
// Get broker information from master
|
|
var brokerAddress string
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find the first available broker
|
|
for _, node := range resp.ClusterNodes {
|
|
brokerAddress = node.Address
|
|
break
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get broker nodes from master: %v", err)
|
|
}
|
|
|
|
if brokerAddress == "" {
|
|
return fmt.Errorf("no active brokers found")
|
|
}
|
|
|
|
// Create gRPC connection
|
|
conn, err := grpc.Dial(brokerAddress, s.grpcDialOption)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to broker: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := mq_pb.NewSeaweedMessagingClient(conn)
|
|
|
|
// First, get the current topic configuration to preserve existing settings
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
currentConfig, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current topic configuration: %v", err)
|
|
}
|
|
|
|
// Create the topic configuration request, preserving all existing settings
|
|
configRequest := &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
// Preserve existing partition count - this is critical!
|
|
PartitionCount: currentConfig.PartitionCount,
|
|
// Preserve existing record type if it exists
|
|
RecordType: currentConfig.RecordType,
|
|
}
|
|
|
|
// Update only the retention configuration
|
|
if enabled {
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: retentionSeconds,
|
|
Enabled: true,
|
|
}
|
|
} else {
|
|
// Set retention to disabled
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: 0,
|
|
Enabled: false,
|
|
}
|
|
}
|
|
|
|
// Send the configuration request with preserved settings
|
|
_, err = client.ConfigureTopic(ctx, configRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update topic retention: %v", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Updated topic %s.%s retention (enabled: %v, seconds: %d) while preserving %d partitions",
|
|
namespace, name, enabled, retentionSeconds, currentConfig.PartitionCount)
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the admin server
|
|
func (s *AdminServer) Shutdown() {
|
|
glog.V(1).Infof("Shutting down admin server...")
|
|
|
|
// Stop maintenance manager
|
|
s.StopMaintenanceManager()
|
|
|
|
// Stop worker gRPC server
|
|
if err := s.StopWorkerGrpcServer(); err != nil {
|
|
glog.Errorf("Failed to stop worker gRPC server: %v", err)
|
|
}
|
|
|
|
glog.V(1).Infof("Admin server shutdown complete")
|
|
}
|