From 03c0b370861f1142343143058b2a1b3797790007 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 18 Aug 2025 07:44:24 -0700 Subject: [PATCH] avoid hanging --- weed/admin/maintenance/maintenance_queue.go | 61 ++++++++++++++++++- weed/server/volume_grpc_batch_delete.go | 53 ++++++++++++---- weed/server/volume_server_handlers_write.go | 4 +- .../erasure_coding/ec_volume_delete.go | 1 + weed/storage/store_ec_delete.go | 32 +++++++++- 5 files changed, 135 insertions(+), 16 deletions(-) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 825b5eaea..2c27d1e4c 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -438,6 +438,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { taskID, task.Type, task.WorkerID, duration, task.VolumeID) } + // CRITICAL FIX: Remove completed/failed tasks from pending queue to prevent infinite loops + // This must happen for both successful completion and permanent failure (not retries) + if task.Status == TaskStatusCompleted || (task.Status == TaskStatusFailed && task.RetryCount >= task.MaxRetries) { + // Remove from pending tasks to prevent stuck scheduling loops + for i, pendingTask := range mq.pendingTasks { + if pendingTask.ID == taskID { + mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...) + glog.V(2).Infof("Removed completed/failed task %s from pending queue", taskID) + break + } + } + } + // Update worker if task.WorkerID != "" { if worker, exists := mq.workers[task.WorkerID]; exists { @@ -461,6 +474,10 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { go mq.cleanupCompletedTasks() } } + + // ADDITIONAL SAFETY: Clean up stale tasks from pending queue + // This ensures no completed/failed tasks remain in the pending queue + go mq.cleanupStalePendingTasks() } // UpdateTaskProgress updates the progress of a running task @@ -744,6 +761,33 @@ func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int { return removed } +// cleanupStalePendingTasks removes completed/failed tasks from the pending queue +// This prevents infinite loops caused by stale tasks that should not be scheduled +func (mq *MaintenanceQueue) cleanupStalePendingTasks() { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + removed := 0 + newPendingTasks := make([]*MaintenanceTask, 0, len(mq.pendingTasks)) + + for _, task := range mq.pendingTasks { + // Keep only tasks that should legitimately be in the pending queue + if task.Status == TaskStatusPending { + newPendingTasks = append(newPendingTasks, task) + } else { + // Remove stale tasks (completed, failed, assigned, in-progress) + glog.V(2).Infof("Removing stale task %s (status: %s) from pending queue", task.ID, task.Status) + removed++ + } + } + + mq.pendingTasks = newPendingTasks + + if removed > 0 { + glog.Infof("Cleaned up %d stale tasks from pending queue", removed) + } +} + // RemoveStaleWorkers removes workers that haven't sent heartbeat recently func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int { mq.mutex.Lock() @@ -843,7 +887,22 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi // canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool { - glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type) + glog.V(2).Infof("Checking if task %s (type: %s, status: %s) can be scheduled", task.ID, task.Type, task.Status) + + // CRITICAL SAFETY CHECK: Never schedule completed or permanently failed tasks + // This prevents infinite loops from stale tasks in pending queue + if task.Status == TaskStatusCompleted { + glog.Errorf("SAFETY GUARD: Task %s is already completed but still in pending queue - this should not happen!", task.ID) + return false + } + if task.Status == TaskStatusFailed && task.RetryCount >= task.MaxRetries { + glog.Errorf("SAFETY GUARD: Task %s has permanently failed but still in pending queue - this should not happen!", task.ID) + return false + } + if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress { + glog.V(2).Infof("Task %s is already assigned/in-progress (status: %s) - skipping", task.ID, task.Status) + return false + } // TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive // Use fallback logic directly for now diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index db67ae9f5..29b58d808 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -5,9 +5,11 @@ import ( "net/http" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -29,6 +31,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B n := new(needle.Needle) volumeId, _ := needle.NewVolumeId(vid) ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId) + var cookie types.Cookie + glog.Errorf("🔥 BATCH DELETE: fid=%s, volumeId=%d, isEcVolume=%t, SkipCookieCheck=%t", fid, volumeId, isEcVolume, req.SkipCookieCheck) if req.SkipCookieCheck { n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie) if err != nil { @@ -40,7 +44,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } } else { n.ParsePath(id_cookie) - cookie := n.Cookie + cookie = n.Cookie if !isEcVolume { if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ @@ -100,18 +104,43 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B ) } } else { - if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusInternalServerError, - Error: err.Error()}, - ) + if req.SkipCookieCheck { + // When skipping cookie check, use the direct gRPC deletion path that bypasses cookie validation + glog.Errorf("🎯 SKIP COOKIE DELETE: volume %d, needle %d, using direct DeleteNeedleFromEcx", ecVolume.VolumeId, n.Id) + err = ecVolume.DeleteNeedleFromEcx(n.Id) + var size int64 = 0 + if err == nil { + // Return a reasonable size for success status + size = int64(n.Size) + } + if err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusInternalServerError, + Error: err.Error()}, + ) + } else { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusAccepted, + Size: uint32(size)}, + ) + } } else { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusAccepted, - Size: uint32(size)}, - ) + // Cookie check enabled, use the cookie validation path + if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusInternalServerError, + Error: err.Error()}, + ) + } else { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusAccepted, + Size: uint32(size)}, + ) + } } } } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index acf3cafb8..26ea36cb9 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -88,10 +88,10 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId) - glog.Infof("🔍 DELETE REQUEST: volume %d, needle %d, hasEcVolume=%t", volumeId, n.Id, hasEcVolume) + glog.Errorf("🔍 DELETE REQUEST: volume %d, needle %d, hasEcVolume=%t, cookie from needle=%x", volumeId, n.Id, hasEcVolume, cookie) if hasEcVolume { - glog.Infof("🎯 ROUTING TO EC DELETION: volume %d, needle %d", volumeId, n.Id) + glog.Errorf("🎯 ROUTING TO EC DELETION: volume %d, needle %d, passing cookie=%x", volumeId, n.Id, cookie) count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie) writeDeleteResult(err, count, w, r) return diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go index 309fce720..7df45d9f1 100644 --- a/weed/storage/erasure_coding/ec_volume_delete.go +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -27,6 +27,7 @@ var ( func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) { + glog.Errorf("🔥 DELETE NEEDLE FROM ECX: volume %d generation %d needle %d", ev.VolumeId, ev.Generation, needleId) _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted) if err != nil { diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index 00f86df7a..8f0fc9dd0 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -16,21 +16,51 @@ import ( func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { - glog.Infof("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId) + // VERSION CHECK - Should see this in logs if new binary is loaded + glog.Errorf("⭐ VERSION 2024-08-18-06:51 EC DELETE STARTING: needle %d volume %d", n.Id, ecVolume.VolumeId) + glog.Errorf("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId) + + // Early validation checks - using ERROR level to ensure they appear + if ecVolume == nil { + glog.Errorf("❌ EC DELETE: ecVolume is nil for needle %d", n.Id) + return 0, fmt.Errorf("ecVolume is nil") + } + if n == nil { + glog.Errorf("❌ EC DELETE: needle is nil") + return 0, fmt.Errorf("needle is nil") + } + + glog.Errorf("🔍 EC DELETE DEBUG: Validated inputs - needle %d, volume %d, generation %d", n.Id, ecVolume.VolumeId, ecVolume.Generation) + + defer func() { + if r := recover(); r != nil { + glog.Errorf("❌ EC DELETE PANIC: needle %d volume %d - %v", n.Id, ecVolume.VolumeId, r) + } + }() + glog.Errorf("🔍 EC DELETE DEBUG: About to call ReadEcShardNeedle for needle %d volume %d", n.Id, ecVolume.VolumeId) count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil) + glog.Errorf("🔍 EC DELETE DEBUG: ReadEcShardNeedle returned count=%d, err=%v", count, err) if err != nil { + glog.Errorf("❌ EC DELETE: Failed to read needle %d from volume %d: %v", n.Id, ecVolume.VolumeId, err) return 0, err } + glog.Infof("✅ EC DELETE: Successfully read needle %d, count=%d", n.Id, count) + glog.Infof("🔍 EC DELETE DEBUG: Checking cookie for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie) if cookie != n.Cookie { + glog.Errorf("❌ EC DELETE: Cookie mismatch for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie) return 0, fmt.Errorf("unexpected cookie %x", cookie) } + glog.Infof("✅ EC DELETE: Cookie validation passed for needle %d", n.Id) + glog.Infof("🔍 EC DELETE DEBUG: Deleting needle %d from remote EC shards", n.Id) if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil { + glog.Errorf("❌ EC DELETE: Failed to delete needle %d from remote EC shards: %v", n.Id, err) return 0, err } + glog.Infof("✅ EC DELETE: Successfully deleted needle %d from remote EC shards", n.Id) // Record the deletion locally in the .ecj journal file glog.Infof("🔍 EC DELETION: Recording needle %d in volume %d generation %d",