diff --git a/weed/admin/maintenance/maintenance_worker.go b/weed/admin/maintenance/maintenance_worker.go index 5bcd02777..f0baab715 100644 --- a/weed/admin/maintenance/maintenance_worker.go +++ b/weed/admin/maintenance/maintenance_worker.go @@ -10,6 +10,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" + + // Import task packages to trigger their auto-registration + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // MaintenanceWorkerService manages maintenance task execution diff --git a/weed/worker/client.go b/weed/worker/client.go index 60b33fb31..a54661d77 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error { return fmt.Errorf("already connected") } + // Always start the reconnection loop, even if initial connection fails + go c.reconnectionLoop() + + // Attempt initial connection + err := c.attemptConnection() + if err != nil { + glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) + return err + } + + return nil +} + +// attemptConnection tries to establish the connection without managing the reconnection loop +func (c *GrpcAdminClient) attemptConnection() error { // Detect TLS support and create appropriate connection conn, err := c.createConnection() if err != nil { @@ -100,10 +115,9 @@ func (c *GrpcAdminClient) Connect() error { c.stream = stream c.connected = true - // Start stream handlers and reconnection loop + // Start stream handlers go c.handleOutgoing() go c.handleIncoming() - go c.reconnectionLoop() glog.Infof("Connected to admin server at %s", c.adminAddress) return nil @@ -268,39 +282,15 @@ func (c *GrpcAdminClient) reconnect() error { if c.conn != nil { c.conn.Close() } + c.connected = false c.mutex.Unlock() - // Create new connection - conn, err := c.createConnection() - if err != nil { - return fmt.Errorf("failed to create connection: %w", err) - } - - client := worker_pb.NewWorkerServiceClient(conn) - - // Create new stream - streamCtx, streamCancel := context.WithCancel(context.Background()) - stream, err := client.WorkerStream(streamCtx) + // Attempt to re-establish connection using the same logic as initial connection + err := c.attemptConnection() if err != nil { - conn.Close() - streamCancel() - return fmt.Errorf("failed to create stream: %w", err) + return fmt.Errorf("failed to reconnect: %w", err) } - // Update client state - c.mutex.Lock() - c.conn = conn - c.client = client - c.stream = stream - c.streamCtx = streamCtx - c.streamCancel = streamCancel - c.connected = true - c.mutex.Unlock() - - // Restart stream handlers - go c.handleOutgoing() - go c.handleIncoming() - // Re-register worker if we have previous registration info c.mutex.RLock() workerInfo := c.lastWorkerInfo @@ -308,10 +298,9 @@ func (c *GrpcAdminClient) reconnect() error { if workerInfo != nil { glog.Infof("Re-registering worker after reconnection...") - if err := c.sendRegistration(workerInfo); err != nil { - glog.Errorf("Failed to re-register worker: %v", err) + if err := c.RegisterWorker(workerInfo); err != nil { + glog.Warningf("Failed to re-register worker after reconnection: %v", err) // Don't fail the reconnection because of registration failure - // The registration will be retried on next heartbeat or operation } } @@ -438,6 +427,17 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the heartbeat + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping heartbeat during reconnection") + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(10 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err) @@ -477,6 +477,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta // RequestTask requests a new task from admin server func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { if !c.connected { + // If we're currently reconnecting, don't wait - just return no task + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping task request during reconnection") + return nil, nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return nil, fmt.Errorf("not connected to admin server: %w", err) @@ -543,6 +554,17 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task // CompleteTask reports task completion to admin server func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the completion report + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID) + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err) @@ -574,6 +596,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str // UpdateTaskProgress updates task progress to admin server func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the progress update + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID) + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err)