Browse Source

Fix issue #7880: Tasks use Volume IDs instead of ip:port (#7881)

* Fix issue #7880: Tasks use Volume IDs instead of ip:port

When volume servers are registered with custom IDs, tasks were attempting
to connect using the ID instead of the actual ip:port address, causing
connection failures.

Modified task detection logic in balance, erasure coding, and vacuum tasks
to resolve volume server IDs to their actual ip:port addresses using
ActiveTopology information.

* Use server addresses directly instead of translating from IDs

Modified VolumeHealthMetrics to include ServerAddress field populated
directly from topology DataNodeInfo.Address. Updated task detection
logic to use addresses directly without runtime lookups.

Changes:
- Added ServerAddress field to VolumeHealthMetrics
- Updated maintenance scanner to populate ServerAddress
- Modified task detection to use ServerAddress for Node fields
- Updated DestinationPlan to include TargetAddress
- Removed runtime address lookups in favor of direct address usage

* Address PR comments: add ServerAddress field, improve error handling

- Add missing ServerAddress field to VolumeHealthMetrics struct
- Add warning in vacuum detection when server not found in topology
- Improve error handling in erasure coding to abort task if sources missing
- Make vacuum task stricter by skipping if server not found in topology

* Refactor: Extract common address resolution logic into shared utility

- Created weed/worker/tasks/util/address.go with ResolveServerAddress function
- Updated balance, erasure_coding, and vacuum detection to use the shared utility
- Removed code duplication and improved maintainability
- Consistent error handling across all task types

* Fix critical issues in task address resolution

- Vacuum: Require topology availability and fail if server not found (no fallback to ID)
- Ensure all task types consistently fail early when topology is incomplete
- Prevent creation of tasks that would fail due to missing server addresses

* Address additional PR feedback

- Add validation for empty addresses in ResolveServerAddress
- Remove redundant serverAddress variable in vacuum detection
- Improve robustness of address resolution

* Improve error logging in vacuum detection

- Include actual error details in log message for better diagnostics
- Make error messages consistent with other task types
pull/7183/merge
Chris Lu 2 days ago
committed by GitHub
parent
commit
c260e6a22e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      weed/admin/maintenance/maintenance_scanner.go
  2. 1
      weed/admin/maintenance/maintenance_types.go
  3. 8
      weed/admin/maintenance/pending_operations_test.go
  4. 1
      weed/admin/topology/structs.go
  5. 12
      weed/worker/tasks/balance/detection.go
  6. 30
      weed/worker/tasks/erasure_coding/detection.go
  7. 23
      weed/worker/tasks/util/address.go
  8. 18
      weed/worker/tasks/vacuum/detection.go
  9. 3
      weed/worker/types/data_types.go

2
weed/admin/maintenance/maintenance_scanner.go

@ -115,6 +115,7 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
metric := &VolumeHealthMetrics{ metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id, VolumeID: volInfo.Id,
Server: node.Id, Server: node.Id,
ServerAddress: node.Address,
DiskType: diskType, // Track which disk this volume is on DiskType: diskType, // Track which disk this volume is on
DiskId: volInfo.DiskId, // Use disk ID from volume info DiskId: volInfo.DiskId, // Use disk ID from volume info
DataCenter: dc.Id, // Data center from current loop DataCenter: dc.Id, // Data center from current loop
@ -207,6 +208,7 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric
simplified = append(simplified, &types.VolumeHealthMetrics{ simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID, VolumeID: metric.VolumeID,
Server: metric.Server, Server: metric.Server,
ServerAddress: metric.ServerAddress,
DiskType: metric.DiskType, DiskType: metric.DiskType,
DiskId: metric.DiskId, DiskId: metric.DiskId,
DataCenter: metric.DataCenter, DataCenter: metric.DataCenter,

1
weed/admin/maintenance/maintenance_types.go

@ -362,6 +362,7 @@ type TaskDetectionResult struct {
type VolumeHealthMetrics struct { type VolumeHealthMetrics struct {
VolumeID uint32 `json:"volume_id"` VolumeID uint32 `json:"volume_id"`
Server string `json:"server"` Server string `json:"server"`
ServerAddress string `json:"server_address"`
DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1")
DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array
DataCenter string `json:"data_center"` // Data center of the server DataCenter string `json:"data_center"` // Data center of the server

8
weed/admin/maintenance/pending_operations_test.go

@ -110,10 +110,10 @@ func TestPendingOperations_VolumeFiltering(t *testing.T) {
// Create volume metrics // Create volume metrics
metrics := []*types.VolumeHealthMetrics{ metrics := []*types.VolumeHealthMetrics{
{VolumeID: 100, Server: "node1"},
{VolumeID: 101, Server: "node2"},
{VolumeID: 102, Server: "node3"},
{VolumeID: 103, Server: "node1"},
{VolumeID: 100, Server: "node1", ServerAddress: "192.168.1.1:8080"},
{VolumeID: 101, Server: "node2", ServerAddress: "192.168.1.2:8080"},
{VolumeID: 102, Server: "node3", ServerAddress: "192.168.1.3:8080"},
{VolumeID: 103, Server: "node1", ServerAddress: "192.168.1.1:8080"},
} }
// Add pending operations on volumes 101 and 103 // Add pending operations on volumes 101 and 103

1
weed/admin/topology/structs.go

@ -97,6 +97,7 @@ type ActiveTopology struct {
// DestinationPlan represents a planned destination for a volume/shard operation // DestinationPlan represents a planned destination for a volume/shard operation
type DestinationPlan struct { type DestinationPlan struct {
TargetNode string `json:"target_node"` TargetNode string `json:"target_node"`
TargetAddress string `json:"target_address"`
TargetDisk uint32 `json:"target_disk"` TargetDisk uint32 `json:"target_disk"`
TargetRack string `json:"target_rack"` TargetRack string `json:"target_rack"`
TargetDC string `json:"target_dc"` TargetDC string `json:"target_dc"`

12
weed/worker/tasks/balance/detection.go

@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
"github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types"
) )
@ -122,7 +123,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Unified sources and targets - the only way to specify locations // Unified sources and targets - the only way to specify locations
Sources: []*worker_pb.TaskSource{ Sources: []*worker_pb.TaskSource{
{ {
Node: selectedVolume.Server,
Node: selectedVolume.ServerAddress,
DiskId: sourceDisk, DiskId: sourceDisk,
VolumeId: selectedVolume.VolumeID, VolumeId: selectedVolume.VolumeID,
EstimatedSize: selectedVolume.Size, EstimatedSize: selectedVolume.Size,
@ -132,7 +133,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}, },
Targets: []*worker_pb.TaskTarget{ Targets: []*worker_pb.TaskTarget{
{ {
Node: destinationPlan.TargetNode,
Node: destinationPlan.TargetAddress,
DiskId: destinationPlan.TargetDisk, DiskId: destinationPlan.TargetDisk,
VolumeId: selectedVolume.VolumeID, VolumeId: selectedVolume.VolumeID,
EstimatedSize: destinationPlan.ExpectedSize, EstimatedSize: destinationPlan.ExpectedSize,
@ -231,8 +232,15 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol
return nil, fmt.Errorf("no suitable destination found for balance operation") return nil, fmt.Errorf("no suitable destination found for balance operation")
} }
// Get the target server address
targetAddress, err := util.ResolveServerAddress(bestDisk.NodeID, activeTopology)
if err != nil {
return nil, fmt.Errorf("failed to resolve address for target server %s: %v", bestDisk.NodeID, err)
}
return &topology.DestinationPlan{ return &topology.DestinationPlan{
TargetNode: bestDisk.NodeID, TargetNode: bestDisk.NodeID,
TargetAddress: targetAddress,
TargetDisk: bestDisk.DiskID, TargetDisk: bestDisk.DiskID,
TargetRack: bestDisk.Rack, TargetRack: bestDisk.Rack,
TargetDC: bestDisk.DataCenter, TargetDC: bestDisk.DataCenter,

30
weed/worker/tasks/erasure_coding/detection.go

@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
"github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types"
) )
@ -183,6 +184,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
taskID, metric.VolumeID, len(sources), len(multiPlan.Plans)) taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
// Convert sources
sourcesProto, err := convertTaskSourcesToProtobuf(sources, metric.VolumeID, clusterInfo.ActiveTopology)
if err != nil {
glog.Warningf("Failed to convert sources for EC task on volume %d: %v, skipping", metric.VolumeID, err)
continue
}
// Create unified sources and targets for EC task // Create unified sources and targets for EC task
result.TypedParams = &worker_pb.TaskParams{ result.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task TaskId: taskID, // Link to ActiveTopology pending task
@ -191,7 +199,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
VolumeSize: metric.Size, // Store original volume size for tracking changes VolumeSize: metric.Size, // Store original volume size for tracking changes
// Unified sources - all sources that will be processed/cleaned up // Unified sources - all sources that will be processed/cleaned up
Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID),
Sources: sourcesProto,
// Unified targets - all EC shard destinations // Unified targets - all EC shard destinations
Targets: createECTargets(multiPlan), Targets: createECTargets(multiPlan),
@ -296,8 +304,15 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
dcCount := make(map[string]int) dcCount := make(map[string]int)
for _, disk := range selectedDisks { for _, disk := range selectedDisks {
// Get the target server address
targetAddress, err := util.ResolveServerAddress(disk.NodeID, activeTopology)
if err != nil {
return nil, fmt.Errorf("failed to resolve address for target server %s: %v", disk.NodeID, err)
}
plan := &topology.DestinationPlan{ plan := &topology.DestinationPlan{
TargetNode: disk.NodeID, TargetNode: disk.NodeID,
TargetAddress: targetAddress,
TargetDisk: disk.DiskID, TargetDisk: disk.DiskID,
TargetRack: disk.Rack, TargetRack: disk.Rack,
TargetDC: disk.DataCenter, TargetDC: disk.DataCenter,
@ -358,7 +373,7 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.Task
// Create targets with assigned shard IDs // Create targets with assigned shard IDs
for i, plan := range multiPlan.Plans { for i, plan := range multiPlan.Plans {
target := &worker_pb.TaskTarget{ target := &worker_pb.TaskTarget{
Node: plan.TargetNode,
Node: plan.TargetAddress,
DiskId: plan.TargetDisk, DiskId: plan.TargetDisk,
Rack: plan.TargetRack, Rack: plan.TargetRack,
DataCenter: plan.TargetDC, DataCenter: plan.TargetDC,
@ -388,12 +403,17 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.Task
} }
// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource // convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource
func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource {
func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32, activeTopology *topology.ActiveTopology) ([]*worker_pb.TaskSource, error) {
var protobufSources []*worker_pb.TaskSource var protobufSources []*worker_pb.TaskSource
for _, source := range sources { for _, source := range sources {
serverAddress, err := util.ResolveServerAddress(source.ServerID, activeTopology)
if err != nil {
return nil, fmt.Errorf("failed to resolve address for source server %s: %v", source.ServerID, err)
}
pbSource := &worker_pb.TaskSource{ pbSource := &worker_pb.TaskSource{
Node: source.ServerID,
Node: serverAddress,
DiskId: source.DiskID, DiskId: source.DiskID,
DataCenter: source.DataCenter, DataCenter: source.DataCenter,
Rack: source.Rack, Rack: source.Rack,
@ -418,7 +438,7 @@ func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID ui
protobufSources = append(protobufSources, pbSource) protobufSources = append(protobufSources, pbSource)
} }
return protobufSources
return protobufSources, nil
} }
// createECTaskParams creates clean EC task parameters (destinations now in unified targets) // createECTaskParams creates clean EC task parameters (destinations now in unified targets)

23
weed/worker/tasks/util/address.go

@ -0,0 +1,23 @@
package util
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
)
// ResolveServerAddress resolves a server ID to its network address using the active topology
func ResolveServerAddress(serverID string, activeTopology *topology.ActiveTopology) (string, error) {
if activeTopology == nil {
return "", fmt.Errorf("topology not available")
}
allNodes := activeTopology.GetAllNodes()
nodeInfo, exists := allNodes[serverID]
if !exists {
return "", fmt.Errorf("server %s not found in topology", serverID)
}
if nodeInfo.Address == "" {
return "", fmt.Errorf("server %s has no address in topology", serverID)
}
return nodeInfo.Address, nil
}

18
weed/worker/tasks/vacuum/detection.go

@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
"github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types"
) )
@ -48,7 +49,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Create typed parameters for vacuum task // Create typed parameters for vacuum task
result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo)
results = append(results, result)
if result.TypedParams != nil {
results = append(results, result)
}
} else { } else {
// Debug why volume was not selected // Debug why volume was not selected
if debugCount < 5 { // Limit debug output to first 5 volumes if debugCount < 5 { // Limit debug output to first 5 volumes
@ -102,6 +105,17 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
// Use DC and rack information directly from VolumeHealthMetrics // Use DC and rack information directly from VolumeHealthMetrics
sourceDC, sourceRack := metric.DataCenter, metric.Rack sourceDC, sourceRack := metric.DataCenter, metric.Rack
// Get server address from topology (required for vacuum tasks)
if clusterInfo == nil || clusterInfo.ActiveTopology == nil {
glog.Errorf("Topology not available for vacuum task on volume %d, skipping", task.VolumeID)
return nil
}
address, err := util.ResolveServerAddress(task.Server, clusterInfo.ActiveTopology)
if err != nil {
glog.Errorf("Failed to resolve address for server %s for vacuum task on volume %d, skipping task: %v", task.Server, task.VolumeID, err)
return nil
}
// Create typed protobuf parameters with unified sources // Create typed protobuf parameters with unified sources
return &worker_pb.TaskParams{ return &worker_pb.TaskParams{
TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
@ -112,7 +126,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
// Unified sources array // Unified sources array
Sources: []*worker_pb.TaskSource{ Sources: []*worker_pb.TaskSource{
{ {
Node: task.Server,
Node: address,
VolumeId: task.VolumeID, VolumeId: task.VolumeID,
EstimatedSize: metric.Size, EstimatedSize: metric.Size,
DataCenter: sourceDC, DataCenter: sourceDC,

3
weed/worker/types/data_types.go

@ -18,7 +18,8 @@ type ClusterInfo struct {
// VolumeHealthMetrics contains health information about a volume (simplified) // VolumeHealthMetrics contains health information about a volume (simplified)
type VolumeHealthMetrics struct { type VolumeHealthMetrics struct {
VolumeID uint32 VolumeID uint32
Server string
Server string // Volume server ID
ServerAddress string // Volume server address (ip:port)
DiskType string // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1") DiskType string // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1")
DiskId uint32 // ID of the disk in Store.Locations array DiskId uint32 // ID of the disk in Store.Locations array
DataCenter string // Data center of the server DataCenter string // Data center of the server

Loading…
Cancel
Save