From f06ddd05cc148e2fc9ea60566ca4f6346e1f2762 Mon Sep 17 00:00:00 2001 From: Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> Date: Fri, 24 Oct 2025 03:09:46 +0300 Subject: [PATCH] Improve-worker (#7367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ♻️ refactor(worker): remove goto * ♻️ refactor(worker): let manager loop exit by itself * ♻️ refactor(worker): fix race condition when closing worker CloseSend is not safe to call when another goroutine concurrently calls Send. streamCancel already handles proper stream closure. Also, streamExit signal should be called AFTER sending shutdownMsg Now the worker has no race condition if stopped during any moment (hopefully, tested with -race flag) * 🐛 fix(task_logger): deadlock in log closure * 🐛 fix(balance): fix balance task Removes the outdated "UnloadVolume" step as it is handled by "DeleteVolume". #7346 --- weed/worker/client.go | 21 +++++---------------- weed/worker/tasks/balance/balance_task.go | 11 ++--------- weed/worker/tasks/task_logger.go | 2 +- weed/worker/worker.go | 14 ++++---------- 4 files changed, 12 insertions(+), 36 deletions(-) diff --git a/weed/worker/client.go b/weed/worker/client.go index 0ec36e419..613d69987 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -26,7 +25,6 @@ type GrpcAdminClient struct { dialOption grpc.DialOption cmds chan grpcCommand - closeOnce sync.Once // Reconnection parameters maxReconnectAttempts int @@ -103,12 +101,14 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di func (c *GrpcAdminClient) managerLoop() { state := &grpcState{shouldReconnect: true} +out: for cmd := range c.cmds { switch cmd.action { case ActionConnect: c.handleConnect(cmd, state) case ActionDisconnect: c.handleDisconnect(cmd, state) + break out case ActionReconnect: if state.connected || state.reconnecting || !state.shouldReconnect { cmd.resp <- ErrAlreadyConnected @@ -240,9 +240,6 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error { if s.streamCancel != nil { s.streamCancel() } - if s.stream != nil { - s.stream.CloseSend() - } if s.conn != nil { s.conn.Close() } @@ -412,9 +409,6 @@ func (c *GrpcAdminClient) Disconnect() error { resp: resp, } err := <-resp - c.closeOnce.Do(func() { - close(c.cmds) - }) return err } @@ -427,9 +421,6 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop reconnection loop close(s.reconnectStop) - // Send shutdown signal to stop handlers loop - close(s.streamExit) - s.connected = false s.shouldReconnect = false @@ -452,16 +443,14 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { glog.Warningf("Failed to send shutdown message") } + // Send shutdown signal to stop handlers loop + close(s.streamExit) + // Cancel stream context if s.streamCancel != nil { s.streamCancel() } - // Close stream - if s.stream != nil { - s.stream.CloseSend() - } - // Close connection if s.conn != nil { s.conn.Close() diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go index 8daafde97..e36885add 100644 --- a/weed/worker/tasks/balance/balance_task.go +++ b/weed/worker/tasks/balance/balance_task.go @@ -106,15 +106,8 @@ func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) glog.Warningf("Tail operation failed (may be normal): %v", err) } - // Step 5: Unmount from source - t.ReportProgress(85.0) - t.GetLogger().Info("Unmounting volume from source server") - if err := t.unmountVolume(sourceServer, volumeId); err != nil { - return fmt.Errorf("failed to unmount volume from source: %v", err) - } - - // Step 6: Delete from source - t.ReportProgress(95.0) + // Step 5: Delete from source + t.ReportProgress(90.0) t.GetLogger().Info("Deleting volume from source server") if err := t.deleteVolume(sourceServer, volumeId); err != nil { return fmt.Errorf("failed to delete volume from source: %v", err) diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go index 430513184..cc65c6d7b 100644 --- a/weed/worker/tasks/task_logger.go +++ b/weed/worker/tasks/task_logger.go @@ -232,6 +232,7 @@ func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[ // Close closes the logger and finalizes metadata func (l *FileTaskLogger) Close() error { + l.Info("Task logger closed for %s", l.taskID) l.mutex.Lock() defer l.mutex.Unlock() @@ -260,7 +261,6 @@ func (l *FileTaskLogger) Close() error { } l.closed = true - l.Info("Task logger closed for %s", l.taskID) return nil } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index afc203318..bbd1f4662 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -7,7 +7,6 @@ import ( "os" "path/filepath" "strings" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -29,7 +28,6 @@ type Worker struct { cmds chan workerCommand state *workerState taskLogHandler *tasks.TaskLogHandler - closeOnce sync.Once } type workerState struct { running bool @@ -201,13 +199,14 @@ func (w *Worker) managerLoop() { stopChan: make(chan struct{}), currentTasks: make(map[string]*types.TaskInput), } - +out: for cmd := range w.cmds { switch cmd.action { case ActionStart: w.handleStart(cmd) case ActionStop: w.handleStop(cmd) + break out case ActionGetStatus: respCh := cmd.data.(statusResponse) var currentTasks []types.TaskInput @@ -495,15 +494,15 @@ func (w *Worker) Stop() error { // Wait for tasks to finish timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() +out: for w.getTaskLoad() > 0 { select { case <-timeout.C: glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad()) - goto end_wait + break out case <-time.After(100 * time.Millisecond): } } -end_wait: // Disconnect from admin server if adminClient := w.getAdmin(); adminClient != nil { @@ -511,10 +510,6 @@ end_wait: glog.Errorf("Error disconnecting from admin server: %v", err) } } - - w.closeOnce.Do(func() { - close(w.cmds) - }) glog.Infof("Worker %s stopped", w.id) return nil } @@ -731,7 +726,6 @@ func (w *Worker) executeTask(task *types.TaskInput) { fileLogger.Info("Task %s completed successfully", task.ID) } } - return } // completeTask reports task completion to admin server