diff --git a/weed/worker/tasks/ec_vacuum/detection.go b/weed/worker/tasks/ec_vacuum/detection.go index 60427bab2..a9e3f5ccb 100644 --- a/weed/worker/tasks/ec_vacuum/detection.go +++ b/weed/worker/tasks/ec_vacuum/detection.go @@ -88,20 +88,20 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, VacuumParams: &worker_pb.VacuumTaskParams{ GarbageThreshold: deletionRatio, ForceVacuum: false, - BatchSize: 1000, // Default batch size - WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead - VerifyChecksum: true, // Enable checksum verification for safety + BatchSize: 1000, // Default batch size + WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead + VerifyChecksum: true, // Enable checksum verification for safety }, }, } result := &wtypes.TaskDetectionResult{ - TaskID: taskID, - TaskType: wtypes.TaskType("ec_vacuum"), - VolumeID: volumeID, - Server: ecInfo.PrimaryNode, - Collection: ecInfo.Collection, - Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent + TaskID: taskID, + TaskType: wtypes.TaskType("ec_vacuum"), + VolumeID: volumeID, + Server: ecInfo.PrimaryNode, + Collection: ecInfo.Collection, + Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%.1fh), size=%.1fMB (>%dMB)", deletionRatio*100, ecVacuumConfig.DeletionThreshold*100, ecInfo.Age.Hours(), (time.Duration(ecVacuumConfig.MinVolumeAgeSeconds) * time.Second).Hours(), @@ -255,13 +255,18 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol ecVolumeInfo.ShardNodes[serverAddr] = erasure_coding.ShardBits(0) } - // Add all shards on this disk for this volume - for i := 0; i < len(ecShardInfo.ShardSizes); i++ { - ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i)) + // Add shards based on actual EcIndexBits, not ShardSizes length + ecIndexBits := ecShardInfo.EcIndexBits + actualShards := make([]int, 0) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if (ecIndexBits & (1 << uint(i))) != 0 { + ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i)) + actualShards = append(actualShards, i) + } } - glog.V(3).Infof("EC volume %d: found %d shards on server %s", - volumeID, len(ecShardInfo.ShardSizes), node.Id) + glog.V(2).Infof("EC volume %d: found shards %v on server %s (EcIndexBits=0x%x)", + volumeID, actualShards, node.Id, ecIndexBits) } } } @@ -271,12 +276,19 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol // Log shard distribution summary for volumeID, ecInfo := range ecVolumes { - totalShards := 0 - for _, shardBits := range ecInfo.ShardNodes { - totalShards += shardBits.ShardIdCount() + shardDistribution := make(map[string][]int) + for serverAddr, shardBits := range ecInfo.ShardNodes { + shards := make([]int, 0) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if shardBits.HasShardId(erasure_coding.ShardId(i)) { + shards = append(shards, i) + } + } + if len(shards) > 0 { + shardDistribution[string(serverAddr)] = shards + } } - glog.V(2).Infof("EC volume %d: total shards=%d across %d servers", - volumeID, totalShards, len(ecInfo.ShardNodes)) + glog.V(1).Infof("EC volume %d shard distribution: %+v", volumeID, shardDistribution) } } @@ -305,15 +317,26 @@ func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) b return false } - // Check if we have enough shards for vacuum operation - totalShards := 0 + // Check if we have all required data shards (0-9) for vacuum operation + availableDataShards := make(map[int]bool) for _, shardBits := range ecInfo.ShardNodes { - totalShards += shardBits.ShardIdCount() + for i := 0; i < erasure_coding.DataShardsCount; i++ { + if shardBits.HasShardId(erasure_coding.ShardId(i)) { + availableDataShards[i] = true + } + } + } + + missingDataShards := make([]int, 0) + for i := 0; i < erasure_coding.DataShardsCount; i++ { + if !availableDataShards[i] { + missingDataShards = append(missingDataShards, i) + } } - if totalShards < erasure_coding.DataShardsCount { - glog.V(3).Infof("EC volume %d insufficient shards for vacuum: have=%d, need=%d", - ecInfo.VolumeID, totalShards, erasure_coding.DataShardsCount) + if len(missingDataShards) > 0 { + glog.V(1).Infof("EC volume %d incomplete for vacuum: missing data shards %v (need shards 0-%d)", + ecInfo.VolumeID, missingDataShards, erasure_coding.DataShardsCount-1) return false } diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 630e557d4..e24143058 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -136,10 +136,16 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { } } + // Validate we found a target node + if targetNode == "" || maxShardCount == 0 { + return "", fmt.Errorf("no valid target node found: sourceNodes=%d, maxShardCount=%d", len(t.sourceNodes), maxShardCount) + } + t.LogInfo("Selected target node for shard collection", map[string]interface{}{ - "target_node": targetNode, - "existing_bits": existingEcIndexBits, - "shard_count": maxShardCount, + "target_node": targetNode, + "existing_bits": existingEcIndexBits, + "shard_count": maxShardCount, + "existing_shards": existingEcIndexBits.ShardIds(), }) // Copy missing shards to target node @@ -196,6 +202,41 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { existingEcIndexBits = existingEcIndexBits.Plus(needToCopyBits) } + // Validate that we have all required data shards (0-9) for decoding + missingDataShards := make([]int, 0) + for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ { + if !existingEcIndexBits.HasShardId(erasure_coding.ShardId(shardId)) { + missingDataShards = append(missingDataShards, shardId) + } + } + + if len(missingDataShards) > 0 { + // Log all available shards across all source nodes for debugging + allAvailableShards := make(map[int][]string) + for node, shardBits := range t.sourceNodes { + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if shardBits.HasShardId(erasure_coding.ShardId(shardId)) { + allAvailableShards[shardId] = append(allAvailableShards[shardId], string(node)) + } + } + } + + t.LogInfo("ERROR: Missing required data shards for decoding", map[string]interface{}{ + "volume_id": t.volumeID, + "missing_shards": missingDataShards, + "collected_shards": existingEcIndexBits.ShardIds(), + "all_available_shards": allAvailableShards, + }) + + return "", fmt.Errorf("missing required data shards %v for EC volume %d decoding", missingDataShards, t.volumeID) + } + + t.LogInfo("Successfully collected all required data shards", map[string]interface{}{ + "volume_id": t.volumeID, + "target_node": targetNode, + "collected_shards": existingEcIndexBits.ShardIds(), + }) + return targetNode, nil } diff --git a/weed/worker/tasks/ec_vacuum/register.go b/weed/worker/tasks/ec_vacuum/register.go index ef6aaefe0..b284d4127 100644 --- a/weed/worker/tasks/ec_vacuum/register.go +++ b/weed/worker/tasks/ec_vacuum/register.go @@ -49,11 +49,76 @@ func RegisterEcVacuumTask() { } // Parse source nodes from task parameters + glog.Infof("Creating EC vacuum task for volume %d with %d sources", params.VolumeId, len(params.Sources)) + + // Log raw source data for debugging + for i, source := range params.Sources { + glog.Infof("Raw source %d: node=%s, shardIds=%v", i, source.Node, source.ShardIds) + } + sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits) - // For now, we'll collect source nodes during execution since the exact - // EC shard distribution is determined dynamically during detection - // The task will discover and collect all shards during execution + // Populate source nodes from the task parameters + for _, source := range params.Sources { + if source.Node == "" { + continue + } + + serverAddr := pb.ServerAddress(source.Node) + var shardBits erasure_coding.ShardBits + + // Convert shard IDs to ShardBits + for _, shardId := range source.ShardIds { + if shardId < erasure_coding.TotalShardsCount { + shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + } + + if shardBits.ShardIdCount() > 0 { + sourceNodes[serverAddr] = shardBits + } + } + + // Verify we have source nodes + if len(sourceNodes) == 0 { + return nil, fmt.Errorf("no valid source nodes found for EC vacuum task: sources=%d", len(params.Sources)) + } + + // Log detailed shard distribution for debugging + shardDistribution := make(map[string][]int) + for serverAddr, shardBits := range sourceNodes { + shardDistribution[string(serverAddr)] = make([]int, 0) + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if shardBits.HasShardId(erasure_coding.ShardId(shardId)) { + shardDistribution[string(serverAddr)] = append(shardDistribution[string(serverAddr)], shardId) + } + } + } + + // Validate that we have all required data shards + allShards := make(map[int]bool) + for _, shardBits := range sourceNodes { + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if shardBits.HasShardId(erasure_coding.ShardId(i)) { + allShards[i] = true + } + } + } + + missingShards := make([]int, 0) + for i := 0; i < erasure_coding.DataShardsCount; i++ { + if !allShards[i] { + missingShards = append(missingShards, i) + } + } + + if len(missingShards) > 0 { + glog.Warningf("EC vacuum task for volume %d has missing data shards %v - this should not happen! Distribution: %+v", + params.VolumeId, missingShards, shardDistribution) + } else { + glog.Infof("EC vacuum task created for volume %d with complete data shards. Distribution: %+v", + params.VolumeId, shardDistribution) + } return NewEcVacuumTask( fmt.Sprintf("ec_vacuum-%d", params.VolumeId),