You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

163 lines
5.0 KiB

package maintenance
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// NewMaintenanceScanner creates a new maintenance scanner
func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner {
scanner := &MaintenanceScanner{
adminClient: adminClient,
policy: policy,
queue: queue,
lastScan: make(map[MaintenanceTaskType]time.Time),
}
// Initialize integration
scanner.integration = NewMaintenanceIntegration(queue, policy)
// Set up bidirectional relationship
queue.SetIntegration(scanner.integration)
glog.V(1).Infof("Initialized maintenance scanner with task system")
return scanner
}
// ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks
func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) {
// Get volume health metrics
volumeMetrics, err := ms.getVolumeHealthMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get volume health metrics: %v", err)
}
// Use task system for all task types
if ms.integration != nil {
// Convert metrics to task system format
taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
// Use task detection system
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
glog.Errorf("Task scanning failed: %v", err)
return nil, err
}
glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results))
return results, nil
}
// No integration available
glog.Warningf("No integration available, no tasks will be scheduled")
return []*TaskDetectionResult{}, nil
}
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo == nil {
return nil
}
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
for _, volInfo := range diskInfo.VolumeInfos {
metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id,
Server: node.Id,
Collection: volInfo.Collection,
Size: volInfo.Size,
DeletedBytes: volInfo.DeletedByteCount,
LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
IsReadOnly: volInfo.ReadOnly,
IsECVolume: false, // Will be determined from volume structure
ReplicaCount: 1, // Will be counted
ExpectedReplicas: int(volInfo.ReplicaPlacement),
}
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
// Calculate fullness ratio (would need volume size limit)
// metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
}
metric.Age = time.Since(metric.LastModified)
metrics = append(metrics, metric)
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Count actual replicas and identify EC volumes
ms.enrichVolumeMetrics(metrics)
return metrics, nil
}
// enrichVolumeMetrics adds additional information like replica counts
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
// Group volumes by ID to count replicas
volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
for _, metric := range metrics {
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
// Update replica counts
for _, group := range volumeGroups {
actualReplicas := len(group)
for _, metric := range group {
metric.ReplicaCount = actualReplicas
}
}
}
// convertToTaskMetrics converts existing volume metrics to task system format
func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics {
var simplified []*types.VolumeHealthMetrics
for _, metric := range metrics {
simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Size: metric.Size,
DeletedBytes: metric.DeletedBytes,
GarbageRatio: metric.GarbageRatio,
LastModified: metric.LastModified,
Age: metric.Age,
ReplicaCount: metric.ReplicaCount,
ExpectedReplicas: metric.ExpectedReplicas,
IsReadOnly: metric.IsReadOnly,
HasRemoteCopy: metric.HasRemoteCopy,
IsECVolume: metric.IsECVolume,
FullnessRatio: metric.FullnessRatio,
})
}
return simplified
}