diff --git a/weed/worker/client.go b/weed/worker/client.go index 0ec36e419..613d69987 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -26,7 +25,6 @@ type GrpcAdminClient struct { dialOption grpc.DialOption cmds chan grpcCommand - closeOnce sync.Once // Reconnection parameters maxReconnectAttempts int @@ -103,12 +101,14 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di func (c *GrpcAdminClient) managerLoop() { state := &grpcState{shouldReconnect: true} +out: for cmd := range c.cmds { switch cmd.action { case ActionConnect: c.handleConnect(cmd, state) case ActionDisconnect: c.handleDisconnect(cmd, state) + break out case ActionReconnect: if state.connected || state.reconnecting || !state.shouldReconnect { cmd.resp <- ErrAlreadyConnected @@ -240,9 +240,6 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error { if s.streamCancel != nil { s.streamCancel() } - if s.stream != nil { - s.stream.CloseSend() - } if s.conn != nil { s.conn.Close() } @@ -412,9 +409,6 @@ func (c *GrpcAdminClient) Disconnect() error { resp: resp, } err := <-resp - c.closeOnce.Do(func() { - close(c.cmds) - }) return err } @@ -427,9 +421,6 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop reconnection loop close(s.reconnectStop) - // Send shutdown signal to stop handlers loop - close(s.streamExit) - s.connected = false s.shouldReconnect = false @@ -452,16 +443,14 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { glog.Warningf("Failed to send shutdown message") } + // Send shutdown signal to stop handlers loop + close(s.streamExit) + // Cancel stream context if s.streamCancel != nil { s.streamCancel() } - // Close stream - if s.stream != nil { - s.stream.CloseSend() - } - // Close connection if s.conn != nil { s.conn.Close() diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go index 8daafde97..e36885add 100644 --- a/weed/worker/tasks/balance/balance_task.go +++ b/weed/worker/tasks/balance/balance_task.go @@ -106,15 +106,8 @@ func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) glog.Warningf("Tail operation failed (may be normal): %v", err) } - // Step 5: Unmount from source - t.ReportProgress(85.0) - t.GetLogger().Info("Unmounting volume from source server") - if err := t.unmountVolume(sourceServer, volumeId); err != nil { - return fmt.Errorf("failed to unmount volume from source: %v", err) - } - - // Step 6: Delete from source - t.ReportProgress(95.0) + // Step 5: Delete from source + t.ReportProgress(90.0) t.GetLogger().Info("Deleting volume from source server") if err := t.deleteVolume(sourceServer, volumeId); err != nil { return fmt.Errorf("failed to delete volume from source: %v", err) diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go index 430513184..cc65c6d7b 100644 --- a/weed/worker/tasks/task_logger.go +++ b/weed/worker/tasks/task_logger.go @@ -232,6 +232,7 @@ func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[ // Close closes the logger and finalizes metadata func (l *FileTaskLogger) Close() error { + l.Info("Task logger closed for %s", l.taskID) l.mutex.Lock() defer l.mutex.Unlock() @@ -260,7 +261,6 @@ func (l *FileTaskLogger) Close() error { } l.closed = true - l.Info("Task logger closed for %s", l.taskID) return nil } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index afc203318..bbd1f4662 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -7,7 +7,6 @@ import ( "os" "path/filepath" "strings" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -29,7 +28,6 @@ type Worker struct { cmds chan workerCommand state *workerState taskLogHandler *tasks.TaskLogHandler - closeOnce sync.Once } type workerState struct { running bool @@ -201,13 +199,14 @@ func (w *Worker) managerLoop() { stopChan: make(chan struct{}), currentTasks: make(map[string]*types.TaskInput), } - +out: for cmd := range w.cmds { switch cmd.action { case ActionStart: w.handleStart(cmd) case ActionStop: w.handleStop(cmd) + break out case ActionGetStatus: respCh := cmd.data.(statusResponse) var currentTasks []types.TaskInput @@ -495,15 +494,15 @@ func (w *Worker) Stop() error { // Wait for tasks to finish timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() +out: for w.getTaskLoad() > 0 { select { case <-timeout.C: glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad()) - goto end_wait + break out case <-time.After(100 * time.Millisecond): } } -end_wait: // Disconnect from admin server if adminClient := w.getAdmin(); adminClient != nil { @@ -511,10 +510,6 @@ end_wait: glog.Errorf("Error disconnecting from admin server: %v", err) } } - - w.closeOnce.Do(func() { - close(w.cmds) - }) glog.Infof("Worker %s stopped", w.id) return nil } @@ -731,7 +726,6 @@ func (w *Worker) executeTask(task *types.TaskInput) { fileLogger.Info("Task %s completed successfully", task.ID) } } - return } // completeTask reports task completion to admin server