Browse Source

avoid hanging

add-ec-vacuum
chrislu 4 months ago
parent
commit
03c0b37086
  1. 61
      weed/admin/maintenance/maintenance_queue.go
  2. 53
      weed/server/volume_grpc_batch_delete.go
  3. 4
      weed/server/volume_server_handlers_write.go
  4. 1
      weed/storage/erasure_coding/ec_volume_delete.go
  5. 32
      weed/storage/store_ec_delete.go

61
weed/admin/maintenance/maintenance_queue.go

@ -438,6 +438,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
taskID, task.Type, task.WorkerID, duration, task.VolumeID) taskID, task.Type, task.WorkerID, duration, task.VolumeID)
} }
// CRITICAL FIX: Remove completed/failed tasks from pending queue to prevent infinite loops
// This must happen for both successful completion and permanent failure (not retries)
if task.Status == TaskStatusCompleted || (task.Status == TaskStatusFailed && task.RetryCount >= task.MaxRetries) {
// Remove from pending tasks to prevent stuck scheduling loops
for i, pendingTask := range mq.pendingTasks {
if pendingTask.ID == taskID {
mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
glog.V(2).Infof("Removed completed/failed task %s from pending queue", taskID)
break
}
}
}
// Update worker // Update worker
if task.WorkerID != "" { if task.WorkerID != "" {
if worker, exists := mq.workers[task.WorkerID]; exists { if worker, exists := mq.workers[task.WorkerID]; exists {
@ -461,6 +474,10 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
go mq.cleanupCompletedTasks() go mq.cleanupCompletedTasks()
} }
} }
// ADDITIONAL SAFETY: Clean up stale tasks from pending queue
// This ensures no completed/failed tasks remain in the pending queue
go mq.cleanupStalePendingTasks()
} }
// UpdateTaskProgress updates the progress of a running task // UpdateTaskProgress updates the progress of a running task
@ -744,6 +761,33 @@ func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int {
return removed return removed
} }
// cleanupStalePendingTasks removes completed/failed tasks from the pending queue
// This prevents infinite loops caused by stale tasks that should not be scheduled
func (mq *MaintenanceQueue) cleanupStalePendingTasks() {
mq.mutex.Lock()
defer mq.mutex.Unlock()
removed := 0
newPendingTasks := make([]*MaintenanceTask, 0, len(mq.pendingTasks))
for _, task := range mq.pendingTasks {
// Keep only tasks that should legitimately be in the pending queue
if task.Status == TaskStatusPending {
newPendingTasks = append(newPendingTasks, task)
} else {
// Remove stale tasks (completed, failed, assigned, in-progress)
glog.V(2).Infof("Removing stale task %s (status: %s) from pending queue", task.ID, task.Status)
removed++
}
}
mq.pendingTasks = newPendingTasks
if removed > 0 {
glog.Infof("Cleaned up %d stale tasks from pending queue", removed)
}
}
// RemoveStaleWorkers removes workers that haven't sent heartbeat recently // RemoveStaleWorkers removes workers that haven't sent heartbeat recently
func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int { func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int {
mq.mutex.Lock() mq.mutex.Lock()
@ -843,7 +887,22 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic // canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool { func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
glog.V(2).Infof("Checking if task %s (type: %s, status: %s) can be scheduled", task.ID, task.Type, task.Status)
// CRITICAL SAFETY CHECK: Never schedule completed or permanently failed tasks
// This prevents infinite loops from stale tasks in pending queue
if task.Status == TaskStatusCompleted {
glog.Errorf("SAFETY GUARD: Task %s is already completed but still in pending queue - this should not happen!", task.ID)
return false
}
if task.Status == TaskStatusFailed && task.RetryCount >= task.MaxRetries {
glog.Errorf("SAFETY GUARD: Task %s has permanently failed but still in pending queue - this should not happen!", task.ID)
return false
}
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
glog.V(2).Infof("Task %s is already assigned/in-progress (status: %s) - skipping", task.ID, task.Status)
return false
}
// TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive // TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
// Use fallback logic directly for now // Use fallback logic directly for now

53
weed/server/volume_grpc_batch_delete.go

@ -5,9 +5,11 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
) )
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@ -29,6 +31,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
n := new(needle.Needle) n := new(needle.Needle)
volumeId, _ := needle.NewVolumeId(vid) volumeId, _ := needle.NewVolumeId(vid)
ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId) ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId)
var cookie types.Cookie
glog.Errorf("🔥 BATCH DELETE: fid=%s, volumeId=%d, isEcVolume=%t, SkipCookieCheck=%t", fid, volumeId, isEcVolume, req.SkipCookieCheck)
if req.SkipCookieCheck { if req.SkipCookieCheck {
n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie) n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie)
if err != nil { if err != nil {
@ -40,7 +44,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} }
} else { } else {
n.ParsePath(id_cookie) n.ParsePath(id_cookie)
cookie := n.Cookie
cookie = n.Cookie
if !isEcVolume { if !isEcVolume {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
@ -100,18 +104,43 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
) )
} }
} else { } else {
if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
Error: err.Error()},
)
if req.SkipCookieCheck {
// When skipping cookie check, use the direct gRPC deletion path that bypasses cookie validation
glog.Errorf("🎯 SKIP COOKIE DELETE: volume %d, needle %d, using direct DeleteNeedleFromEcx", ecVolume.VolumeId, n.Id)
err = ecVolume.DeleteNeedleFromEcx(n.Id)
var size int64 = 0
if err == nil {
// Return a reasonable size for success status
size = int64(n.Size)
}
if err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
Error: err.Error()},
)
} else {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
Size: uint32(size)},
)
}
} else { } else {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
Size: uint32(size)},
)
// Cookie check enabled, use the cookie validation path
if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
Error: err.Error()},
)
} else {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
Size: uint32(size)},
)
}
} }
} }
} }

4
weed/server/volume_server_handlers_write.go

@ -88,10 +88,10 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId) ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
glog.Infof("🔍 DELETE REQUEST: volume %d, needle %d, hasEcVolume=%t", volumeId, n.Id, hasEcVolume)
glog.Errorf("🔍 DELETE REQUEST: volume %d, needle %d, hasEcVolume=%t, cookie from needle=%x", volumeId, n.Id, hasEcVolume, cookie)
if hasEcVolume { if hasEcVolume {
glog.Infof("🎯 ROUTING TO EC DELETION: volume %d, needle %d", volumeId, n.Id)
glog.Errorf("🎯 ROUTING TO EC DELETION: volume %d, needle %d, passing cookie=%x", volumeId, n.Id, cookie)
count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie) count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
writeDeleteResult(err, count, w, r) writeDeleteResult(err, count, w, r)
return return

1
weed/storage/erasure_coding/ec_volume_delete.go

@ -27,6 +27,7 @@ var (
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) { func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
glog.Errorf("🔥 DELETE NEEDLE FROM ECX: volume %d generation %d needle %d", ev.VolumeId, ev.Generation, needleId)
_, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted) _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil { if err != nil {

32
weed/storage/store_ec_delete.go

@ -16,21 +16,51 @@ import (
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
glog.Infof("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId)
// VERSION CHECK - Should see this in logs if new binary is loaded
glog.Errorf("⭐ VERSION 2024-08-18-06:51 EC DELETE STARTING: needle %d volume %d", n.Id, ecVolume.VolumeId)
glog.Errorf("🚀 EC DELETE SHARD NEEDLE: starting deletion for needle %d volume %d", n.Id, ecVolume.VolumeId)
// Early validation checks - using ERROR level to ensure they appear
if ecVolume == nil {
glog.Errorf("❌ EC DELETE: ecVolume is nil for needle %d", n.Id)
return 0, fmt.Errorf("ecVolume is nil")
}
if n == nil {
glog.Errorf("❌ EC DELETE: needle is nil")
return 0, fmt.Errorf("needle is nil")
}
glog.Errorf("🔍 EC DELETE DEBUG: Validated inputs - needle %d, volume %d, generation %d", n.Id, ecVolume.VolumeId, ecVolume.Generation)
defer func() {
if r := recover(); r != nil {
glog.Errorf("❌ EC DELETE PANIC: needle %d volume %d - %v", n.Id, ecVolume.VolumeId, r)
}
}()
glog.Errorf("🔍 EC DELETE DEBUG: About to call ReadEcShardNeedle for needle %d volume %d", n.Id, ecVolume.VolumeId)
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil) count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
glog.Errorf("🔍 EC DELETE DEBUG: ReadEcShardNeedle returned count=%d, err=%v", count, err)
if err != nil { if err != nil {
glog.Errorf("❌ EC DELETE: Failed to read needle %d from volume %d: %v", n.Id, ecVolume.VolumeId, err)
return 0, err return 0, err
} }
glog.Infof("✅ EC DELETE: Successfully read needle %d, count=%d", n.Id, count)
glog.Infof("🔍 EC DELETE DEBUG: Checking cookie for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie)
if cookie != n.Cookie { if cookie != n.Cookie {
glog.Errorf("❌ EC DELETE: Cookie mismatch for needle %d (expected=%x, actual=%x)", n.Id, cookie, n.Cookie)
return 0, fmt.Errorf("unexpected cookie %x", cookie) return 0, fmt.Errorf("unexpected cookie %x", cookie)
} }
glog.Infof("✅ EC DELETE: Cookie validation passed for needle %d", n.Id)
glog.Infof("🔍 EC DELETE DEBUG: Deleting needle %d from remote EC shards", n.Id)
if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil { if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
glog.Errorf("❌ EC DELETE: Failed to delete needle %d from remote EC shards: %v", n.Id, err)
return 0, err return 0, err
} }
glog.Infof("✅ EC DELETE: Successfully deleted needle %d from remote EC shards", n.Id)
// Record the deletion locally in the .ecj journal file // Record the deletion locally in the .ecj journal file
glog.Infof("🔍 EC DELETION: Recording needle %d in volume %d generation %d", glog.Infof("🔍 EC DELETION: Recording needle %d in volume %d generation %d",

Loading…
Cancel
Save