diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go index 6f3b46be2..ddbf44f55 100644 --- a/weed/admin/maintenance/maintenance_scanner.go +++ b/weed/admin/maintenance/maintenance_scanner.go @@ -115,6 +115,7 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, metric := &VolumeHealthMetrics{ VolumeID: volInfo.Id, Server: node.Id, + ServerAddress: node.Address, DiskType: diskType, // Track which disk this volume is on DiskId: volInfo.DiskId, // Use disk ID from volume info DataCenter: dc.Id, // Data center from current loop @@ -207,6 +208,7 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric simplified = append(simplified, &types.VolumeHealthMetrics{ VolumeID: metric.VolumeID, Server: metric.Server, + ServerAddress: metric.ServerAddress, DiskType: metric.DiskType, DiskId: metric.DiskId, DataCenter: metric.DataCenter, diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index fe5d5fa55..7b7e83818 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -362,6 +362,7 @@ type TaskDetectionResult struct { type VolumeHealthMetrics struct { VolumeID uint32 `json:"volume_id"` Server string `json:"server"` + ServerAddress string `json:"server_address"` DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array DataCenter string `json:"data_center"` // Data center of the server diff --git a/weed/admin/maintenance/pending_operations_test.go b/weed/admin/maintenance/pending_operations_test.go index 64bb591fb..75d511e8f 100644 --- a/weed/admin/maintenance/pending_operations_test.go +++ b/weed/admin/maintenance/pending_operations_test.go @@ -110,10 +110,10 @@ func TestPendingOperations_VolumeFiltering(t *testing.T) { // Create volume metrics metrics := []*types.VolumeHealthMetrics{ - {VolumeID: 100, Server: "node1"}, - {VolumeID: 101, Server: "node2"}, - {VolumeID: 102, Server: "node3"}, - {VolumeID: 103, Server: "node1"}, + {VolumeID: 100, Server: "node1", ServerAddress: "192.168.1.1:8080"}, + {VolumeID: 101, Server: "node2", ServerAddress: "192.168.1.2:8080"}, + {VolumeID: 102, Server: "node3", ServerAddress: "192.168.1.3:8080"}, + {VolumeID: 103, Server: "node1", ServerAddress: "192.168.1.1:8080"}, } // Add pending operations on volumes 101 and 103 diff --git a/weed/admin/topology/structs.go b/weed/admin/topology/structs.go index 103ee5abe..06903352e 100644 --- a/weed/admin/topology/structs.go +++ b/weed/admin/topology/structs.go @@ -97,6 +97,7 @@ type ActiveTopology struct { // DestinationPlan represents a planned destination for a volume/shard operation type DestinationPlan struct { TargetNode string `json:"target_node"` + TargetAddress string `json:"target_address"` TargetDisk uint32 `json:"target_disk"` TargetRack string `json:"target_rack"` TargetDC string `json:"target_dc"` diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 6d433c719..c9f4ebfd5 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -122,7 +123,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Unified sources and targets - the only way to specify locations Sources: []*worker_pb.TaskSource{ { - Node: selectedVolume.Server, + Node: selectedVolume.ServerAddress, DiskId: sourceDisk, VolumeId: selectedVolume.VolumeID, EstimatedSize: selectedVolume.Size, @@ -132,7 +133,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI }, Targets: []*worker_pb.TaskTarget{ { - Node: destinationPlan.TargetNode, + Node: destinationPlan.TargetAddress, DiskId: destinationPlan.TargetDisk, VolumeId: selectedVolume.VolumeID, EstimatedSize: destinationPlan.ExpectedSize, @@ -231,8 +232,15 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol return nil, fmt.Errorf("no suitable destination found for balance operation") } + // Get the target server address + targetAddress, err := util.ResolveServerAddress(bestDisk.NodeID, activeTopology) + if err != nil { + return nil, fmt.Errorf("failed to resolve address for target server %s: %v", bestDisk.NodeID, err) + } + return &topology.DestinationPlan{ TargetNode: bestDisk.NodeID, + TargetAddress: targetAddress, TargetDisk: bestDisk.DiskID, TargetRack: bestDisk.Rack, TargetDC: bestDisk.DataCenter, diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index c5568fe26..1beb910a3 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -183,6 +184,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", taskID, metric.VolumeID, len(sources), len(multiPlan.Plans)) + // Convert sources + sourcesProto, err := convertTaskSourcesToProtobuf(sources, metric.VolumeID, clusterInfo.ActiveTopology) + if err != nil { + glog.Warningf("Failed to convert sources for EC task on volume %d: %v, skipping", metric.VolumeID, err) + continue + } + // Create unified sources and targets for EC task result.TypedParams = &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task @@ -191,7 +199,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI VolumeSize: metric.Size, // Store original volume size for tracking changes // Unified sources - all sources that will be processed/cleaned up - Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID), + Sources: sourcesProto, // Unified targets - all EC shard destinations Targets: createECTargets(multiPlan), @@ -296,8 +304,15 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V dcCount := make(map[string]int) for _, disk := range selectedDisks { + // Get the target server address + targetAddress, err := util.ResolveServerAddress(disk.NodeID, activeTopology) + if err != nil { + return nil, fmt.Errorf("failed to resolve address for target server %s: %v", disk.NodeID, err) + } + plan := &topology.DestinationPlan{ TargetNode: disk.NodeID, + TargetAddress: targetAddress, TargetDisk: disk.DiskID, TargetRack: disk.Rack, TargetDC: disk.DataCenter, @@ -358,7 +373,7 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.Task // Create targets with assigned shard IDs for i, plan := range multiPlan.Plans { target := &worker_pb.TaskTarget{ - Node: plan.TargetNode, + Node: plan.TargetAddress, DiskId: plan.TargetDisk, Rack: plan.TargetRack, DataCenter: plan.TargetDC, @@ -388,12 +403,17 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.Task } // convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource -func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource { +func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32, activeTopology *topology.ActiveTopology) ([]*worker_pb.TaskSource, error) { var protobufSources []*worker_pb.TaskSource for _, source := range sources { + serverAddress, err := util.ResolveServerAddress(source.ServerID, activeTopology) + if err != nil { + return nil, fmt.Errorf("failed to resolve address for source server %s: %v", source.ServerID, err) + } + pbSource := &worker_pb.TaskSource{ - Node: source.ServerID, + Node: serverAddress, DiskId: source.DiskID, DataCenter: source.DataCenter, Rack: source.Rack, @@ -418,7 +438,7 @@ func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID ui protobufSources = append(protobufSources, pbSource) } - return protobufSources + return protobufSources, nil } // createECTaskParams creates clean EC task parameters (destinations now in unified targets) diff --git a/weed/worker/tasks/util/address.go b/weed/worker/tasks/util/address.go new file mode 100644 index 000000000..516edb8db --- /dev/null +++ b/weed/worker/tasks/util/address.go @@ -0,0 +1,23 @@ +package util + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" +) + +// ResolveServerAddress resolves a server ID to its network address using the active topology +func ResolveServerAddress(serverID string, activeTopology *topology.ActiveTopology) (string, error) { + if activeTopology == nil { + return "", fmt.Errorf("topology not available") + } + allNodes := activeTopology.GetAllNodes() + nodeInfo, exists := allNodes[serverID] + if !exists { + return "", fmt.Errorf("server %s not found in topology", serverID) + } + if nodeInfo.Address == "" { + return "", fmt.Errorf("server %s has no address in topology", serverID) + } + return nodeInfo.Address, nil +} diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index bd86a2742..da59a4a7f 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -48,7 +49,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Create typed parameters for vacuum task result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) - results = append(results, result) + if result.TypedParams != nil { + results = append(results, result) + } } else { // Debug why volume was not selected if debugCount < 5 { // Limit debug output to first 5 volumes @@ -102,6 +105,17 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum // Use DC and rack information directly from VolumeHealthMetrics sourceDC, sourceRack := metric.DataCenter, metric.Rack + // Get server address from topology (required for vacuum tasks) + if clusterInfo == nil || clusterInfo.ActiveTopology == nil { + glog.Errorf("Topology not available for vacuum task on volume %d, skipping", task.VolumeID) + return nil + } + address, err := util.ResolveServerAddress(task.Server, clusterInfo.ActiveTopology) + if err != nil { + glog.Errorf("Failed to resolve address for server %s for vacuum task on volume %d, skipping task: %v", task.Server, task.VolumeID, err) + return nil + } + // Create typed protobuf parameters with unified sources return &worker_pb.TaskParams{ TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) @@ -112,7 +126,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum // Unified sources array Sources: []*worker_pb.TaskSource{ { - Node: task.Server, + Node: address, VolumeId: task.VolumeID, EstimatedSize: metric.Size, DataCenter: sourceDC, diff --git a/weed/worker/types/data_types.go b/weed/worker/types/data_types.go index c8a67edc7..64ba5e11c 100644 --- a/weed/worker/types/data_types.go +++ b/weed/worker/types/data_types.go @@ -18,7 +18,8 @@ type ClusterInfo struct { // VolumeHealthMetrics contains health information about a volume (simplified) type VolumeHealthMetrics struct { VolumeID uint32 - Server string + Server string // Volume server ID + ServerAddress string // Volume server address (ip:port) DiskType string // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") DiskId uint32 // ID of the disk in Store.Locations array DataCenter string // Data center of the server