diff --git a/weed/worker/client.go b/weed/worker/client.go index 9066afdf3..0ec36e419 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -2,6 +2,7 @@ package worker import ( "context" + "errors" "fmt" "io" "sync" @@ -14,22 +15,18 @@ import ( "google.golang.org/grpc" ) +var ( + ErrAlreadyConnected = errors.New("already connected") +) + // GrpcAdminClient implements AdminClient using gRPC bidirectional streaming type GrpcAdminClient struct { adminAddress string workerID string dialOption grpc.DialOption - conn *grpc.ClientConn - client worker_pb.WorkerServiceClient - stream worker_pb.WorkerService_WorkerStreamClient - streamCtx context.Context - streamCancel context.CancelFunc - - connected bool - reconnecting bool - shouldReconnect bool - mutex sync.RWMutex + cmds chan grpcCommand + closeOnce sync.Once // Reconnection parameters maxReconnectAttempts int @@ -37,17 +34,48 @@ type GrpcAdminClient struct { maxReconnectBackoff time.Duration reconnectMultiplier float64 - // Worker registration info for re-registration after reconnection - lastWorkerInfo *types.WorkerData - // Channels for communication - outgoing chan *worker_pb.WorkerMessage - incoming chan *worker_pb.AdminMessage - responseChans map[string]chan *worker_pb.AdminMessage - responsesMutex sync.RWMutex + outgoing chan *worker_pb.WorkerMessage + incoming chan *worker_pb.AdminMessage + responseChans map[string]chan *worker_pb.AdminMessage +} + +type grpcAction string + +const ( + ActionConnect grpcAction = "connect" + ActionDisconnect grpcAction = "disconnect" + ActionReconnect grpcAction = "reconnect" + ActionStreamError grpcAction = "stream_error" + ActionRegisterWorker grpcAction = "register_worker" + ActionQueryReconnecting grpcAction = "query_reconnecting" + ActionQueryConnected grpcAction = "query_connected" + ActionQueryShouldReconnect grpcAction = "query_shouldreconnect" +) + +type registrationRequest struct { + Worker *types.WorkerData + Resp chan error // Used to send the registration result back +} - // Shutdown channel - shutdownChan chan struct{} +type grpcCommand struct { + action grpcAction + data any + resp chan error // for reporting success/failure +} + +type grpcState struct { + connected bool + reconnecting bool + shouldReconnect bool + conn *grpc.ClientConn + client worker_pb.WorkerServiceClient + stream worker_pb.WorkerService_WorkerStreamClient + streamCtx context.Context + streamCancel context.CancelFunc + lastWorkerInfo *types.WorkerData + reconnectStop chan struct{} + streamExit chan struct{} } // NewGrpcAdminClient creates a new gRPC admin client @@ -55,11 +83,10 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di // Admin uses HTTP port + 10000 as gRPC port grpcAddress := pb.ServerToGrpcAddress(adminAddress) - return &GrpcAdminClient{ + c := &GrpcAdminClient{ adminAddress: grpcAddress, workerID: workerID, dialOption: dialOption, - shouldReconnect: true, maxReconnectAttempts: 0, // 0 means infinite attempts reconnectBackoff: 1 * time.Second, maxReconnectBackoff: 30 * time.Second, @@ -67,65 +94,129 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di outgoing: make(chan *worker_pb.WorkerMessage, 100), incoming: make(chan *worker_pb.AdminMessage, 100), responseChans: make(map[string]chan *worker_pb.AdminMessage), - shutdownChan: make(chan struct{}), + cmds: make(chan grpcCommand), + } + go c.managerLoop() + return c +} + +func (c *GrpcAdminClient) managerLoop() { + state := &grpcState{shouldReconnect: true} + + for cmd := range c.cmds { + switch cmd.action { + case ActionConnect: + c.handleConnect(cmd, state) + case ActionDisconnect: + c.handleDisconnect(cmd, state) + case ActionReconnect: + if state.connected || state.reconnecting || !state.shouldReconnect { + cmd.resp <- ErrAlreadyConnected + continue + } + state.reconnecting = true // Manager acknowledges the attempt + err := c.reconnect(state) + state.reconnecting = false + cmd.resp <- err + case ActionStreamError: + state.connected = false + case ActionRegisterWorker: + req := cmd.data.(registrationRequest) + state.lastWorkerInfo = req.Worker + if !state.connected { + glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") + // Respond immediately with success (registration will happen later) + req.Resp <- nil + continue + } + err := c.sendRegistration(req.Worker) + req.Resp <- err + case ActionQueryConnected: + respCh := cmd.data.(chan bool) + respCh <- state.connected + case ActionQueryReconnecting: + respCh := cmd.data.(chan bool) + respCh <- state.reconnecting + case ActionQueryShouldReconnect: + respCh := cmd.data.(chan bool) + respCh <- state.shouldReconnect + } } } // Connect establishes gRPC connection to admin server with TLS detection func (c *GrpcAdminClient) Connect() error { - c.mutex.Lock() - if c.connected { - c.mutex.Unlock() - return fmt.Errorf("already connected") + resp := make(chan error) + c.cmds <- grpcCommand{ + action: ActionConnect, + resp: resp, } - // Release lock before calling attemptConnection which needs to acquire locks internally - c.mutex.Unlock() + return <-resp +} - // Always start the reconnection loop, even if initial connection fails - go c.reconnectionLoop() +func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) { + if s.connected { + cmd.resp <- fmt.Errorf("already connected") + return + } + + // Start reconnection loop immediately (async) + stop := make(chan struct{}) + s.reconnectStop = stop + go c.reconnectionLoop(stop) - // Attempt initial connection - err := c.attemptConnection() + // Attempt the initial connection + err := c.attemptConnection(s) if err != nil { glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) - return err + cmd.resp <- err + return } + cmd.resp <- nil +} - return nil +// createConnection attempts to connect using the provided dial option +func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption) + if err != nil { + return nil, fmt.Errorf("failed to connect to admin server: %w", err) + } + + glog.Infof("Connected to admin server at %s", c.adminAddress) + return conn, nil } // attemptConnection tries to establish the connection without managing the reconnection loop -func (c *GrpcAdminClient) attemptConnection() error { +func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Detect TLS support and create appropriate connection conn, err := c.createConnection() if err != nil { return fmt.Errorf("failed to connect to admin server: %w", err) } - c.conn = conn - c.client = worker_pb.NewWorkerServiceClient(conn) + s.conn = conn + s.client = worker_pb.NewWorkerServiceClient(conn) // Create bidirectional stream - c.streamCtx, c.streamCancel = context.WithCancel(context.Background()) - stream, err := c.client.WorkerStream(c.streamCtx) + s.streamCtx, s.streamCancel = context.WithCancel(context.Background()) + stream, err := s.client.WorkerStream(s.streamCtx) + glog.Infof("Worker stream created") if err != nil { - c.conn.Close() + s.conn.Close() return fmt.Errorf("failed to create worker stream: %w", err) } - - c.stream = stream - c.connected = true + s.connected = true + s.stream = stream // Always check for worker info and send registration immediately as the very first message - c.mutex.RLock() - workerInfo := c.lastWorkerInfo - c.mutex.RUnlock() - - if workerInfo != nil { + if s.lastWorkerInfo != nil { // Send registration synchronously as the very first message - if err := c.sendRegistrationSync(workerInfo); err != nil { - c.conn.Close() - c.connected = false + if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil { + s.conn.Close() + s.connected = false return fmt.Errorf("failed to register worker: %w", err) } glog.Infof("Worker registered successfully with admin server") @@ -134,290 +225,268 @@ func (c *GrpcAdminClient) attemptConnection() error { glog.V(1).Infof("Connected to admin server, waiting for worker registration info") } - // Start stream handlers with synchronization - outgoingReady := make(chan struct{}) - incomingReady := make(chan struct{}) - - go c.handleOutgoingWithReady(outgoingReady) - go c.handleIncomingWithReady(incomingReady) - - // Wait for both handlers to be ready - <-outgoingReady - <-incomingReady + // Start stream handlers + s.streamExit = make(chan struct{}) + go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) + go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) glog.Infof("Connected to admin server at %s", c.adminAddress) return nil } -// createConnection attempts to connect using the provided dial option -func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption) - if err != nil { - return nil, fmt.Errorf("failed to connect to admin server: %w", err) - } - - glog.Infof("Connected to admin server at %s", c.adminAddress) - return conn, nil -} - -// Disconnect closes the gRPC connection -func (c *GrpcAdminClient) Disconnect() error { - c.mutex.Lock() - defer c.mutex.Unlock() - - if !c.connected { - return nil - } - - c.connected = false - c.shouldReconnect = false - - // Send shutdown signal to stop reconnection loop - select { - case c.shutdownChan <- struct{}{}: - default: - } - - // Send shutdown message - shutdownMsg := &worker_pb.WorkerMessage{ - WorkerId: c.workerID, - Timestamp: time.Now().Unix(), - Message: &worker_pb.WorkerMessage_Shutdown{ - Shutdown: &worker_pb.WorkerShutdown{ - WorkerId: c.workerID, - Reason: "normal shutdown", - }, - }, +// reconnect attempts to re-establish the connection +func (c *GrpcAdminClient) reconnect(s *grpcState) error { + // Clean up existing connection completely + if s.streamCancel != nil { + s.streamCancel() } - - select { - case c.outgoing <- shutdownMsg: - case <-time.After(time.Second): - glog.Warningf("Failed to send shutdown message") + if s.stream != nil { + s.stream.CloseSend() } - - // Cancel stream context - if c.streamCancel != nil { - c.streamCancel() + if s.conn != nil { + s.conn.Close() } + s.connected = false - // Close stream - if c.stream != nil { - c.stream.CloseSend() - } - - // Close connection - if c.conn != nil { - c.conn.Close() + // Attempt to re-establish connection using the same logic as initial connection + if err := c.attemptConnection(s); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) } - // Close channels - close(c.outgoing) - close(c.incoming) - - glog.Infof("Disconnected from admin server") + // Registration is now handled in attemptConnection if worker info is available return nil } // reconnectionLoop handles automatic reconnection with exponential backoff -func (c *GrpcAdminClient) reconnectionLoop() { +func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { backoff := c.reconnectBackoff attempts := 0 for { + waitDuration := backoff + if attempts == 0 { + waitDuration = time.Second + } select { - case <-c.shutdownChan: + case <-reconnectStop: return - default: + case <-time.After(waitDuration): } - - c.mutex.RLock() - shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting - c.mutex.RUnlock() - - if !shouldReconnect { - time.Sleep(time.Second) - continue + resp := make(chan error, 1) + c.cmds <- grpcCommand{ + action: ActionReconnect, + resp: resp, } - - c.mutex.Lock() - c.reconnecting = true - c.mutex.Unlock() - - glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1) - - // Attempt to reconnect - if err := c.reconnect(); err != nil { + err := <-resp + if err == nil { + // Successful reconnection + attempts = 0 + backoff = c.reconnectBackoff + glog.Infof("Successfully reconnected to admin server") + } else if errors.Is(err, ErrAlreadyConnected) { + attempts = 0 + backoff = c.reconnectBackoff + } else { attempts++ glog.Errorf("Reconnection attempt %d failed: %v", attempts, err) - // Reset reconnecting flag - c.mutex.Lock() - c.reconnecting = false - c.mutex.Unlock() - // Check if we should give up if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts { glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts) - c.mutex.Lock() - c.shouldReconnect = false - c.mutex.Unlock() return } - // Wait with exponential backoff - glog.Infof("Waiting %v before next reconnection attempt", backoff) - - select { - case <-c.shutdownChan: - return - case <-time.After(backoff): - } - // Increase backoff backoff = time.Duration(float64(backoff) * c.reconnectMultiplier) if backoff > c.maxReconnectBackoff { backoff = c.maxReconnectBackoff } - } else { - // Successful reconnection - attempts = 0 - backoff = c.reconnectBackoff - glog.Infof("Successfully reconnected to admin server") - - c.mutex.Lock() - c.reconnecting = false - c.mutex.Unlock() + glog.Infof("Waiting %v before next reconnection attempt", backoff) } } } -// reconnect attempts to re-establish the connection -func (c *GrpcAdminClient) reconnect() error { - // Clean up existing connection completely - c.mutex.Lock() - if c.streamCancel != nil { - c.streamCancel() - } - if c.stream != nil { - c.stream.CloseSend() - } - if c.conn != nil { - c.conn.Close() - } - c.connected = false - c.mutex.Unlock() - - // Attempt to re-establish connection using the same logic as initial connection - err := c.attemptConnection() - if err != nil { - return fmt.Errorf("failed to reconnect: %w", err) - } - - // Registration is now handled in attemptConnection if worker info is available - return nil -} - // handleOutgoing processes outgoing messages to admin -func (c *GrpcAdminClient) handleOutgoing() { - for msg := range c.outgoing { - c.mutex.RLock() - connected := c.connected - stream := c.stream - c.mutex.RUnlock() - - if !connected { - break +func handleOutgoing( + stream worker_pb.WorkerService_WorkerStreamClient, + streamExit <-chan struct{}, + outgoing <-chan *worker_pb.WorkerMessage, + cmds chan<- grpcCommand) { + + msgCh := make(chan *worker_pb.WorkerMessage) + errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy + // Goroutine to handle blocking stream.Recv() and simultaneously handle exit + // signals + go func() { + for msg := range msgCh { + if err := stream.Send(msg); err != nil { + errCh <- err + return // Exit the receiver goroutine on error/EOF + } } + close(errCh) + }() - if err := stream.Send(msg); err != nil { + for msg := range outgoing { + select { + case msgCh <- msg: + case err := <-errCh: glog.Errorf("Failed to send message to admin: %v", err) - c.mutex.Lock() - c.connected = false - c.mutex.Unlock() - break + cmds <- grpcCommand{action: ActionStreamError, data: err} + return + case <-streamExit: + close(msgCh) + <-errCh + return } } } -// handleOutgoingWithReady processes outgoing messages and signals when ready -func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) { - // Signal that this handler is ready to process messages - close(ready) - - // Now process messages normally - c.handleOutgoing() -} - // handleIncoming processes incoming messages from admin -func (c *GrpcAdminClient) handleIncoming() { - glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID) +func handleIncoming( + workerID string, + stream worker_pb.WorkerService_WorkerStreamClient, + streamExit <-chan struct{}, + incoming chan<- *worker_pb.AdminMessage, + cmds chan<- grpcCommand) { + glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) + msgCh := make(chan *worker_pb.AdminMessage) + errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy + // Goroutine to handle blocking stream.Recv() and simultaneously handle exit + // signals + go func() { + for { + msg, err := stream.Recv() + if err != nil { + errCh <- err + return // Exit the receiver goroutine on error/EOF + } + msgCh <- msg + } + }() for { - c.mutex.RLock() - connected := c.connected - stream := c.stream - c.mutex.RUnlock() - - if !connected { - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID) - break - } + glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID) + + select { + case msg := <-msgCh: + // Message successfully received from the stream + glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) - glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", c.workerID) - msg, err := stream.Recv() - if err != nil { + // Route message to waiting goroutines or general handler (original select logic) + select { + case incoming <- msg: + glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) + case <-time.After(time.Second): + glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message) + } + + case err := <-errCh: + // Stream Receiver goroutine reported an error (EOF or network error) if err == io.EOF { - glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", c.workerID) + glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID) } else { - glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err) + glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) } - c.mutex.Lock() - c.connected = false - c.mutex.Unlock() - break - } - glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message) + // Report the failure as a command to the managerLoop (blocking) + cmds <- grpcCommand{action: ActionStreamError, data: err} - // Route message to waiting goroutines or general handler - select { - case c.incoming <- msg: - glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID) - case <-time.After(time.Second): - glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message) + // Exit the main handler loop + glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID) + return + + case <-streamExit: + // Manager closed this channel, signaling a controlled disconnection. + glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID) + return } } +} - glog.V(1).Infof("INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID) +// Connect establishes gRPC connection to admin server with TLS detection +func (c *GrpcAdminClient) Disconnect() error { + resp := make(chan error) + c.cmds <- grpcCommand{ + action: ActionDisconnect, + resp: resp, + } + err := <-resp + c.closeOnce.Do(func() { + close(c.cmds) + }) + return err } -// handleIncomingWithReady processes incoming messages and signals when ready -func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { - // Signal that this handler is ready to process messages - close(ready) +func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { + if !s.connected { + cmd.resp <- fmt.Errorf("already disconnected") + return + } + + // Send shutdown signal to stop reconnection loop + close(s.reconnectStop) + + // Send shutdown signal to stop handlers loop + close(s.streamExit) + + s.connected = false + s.shouldReconnect = false + + // Send shutdown message + shutdownMsg := &worker_pb.WorkerMessage{ + WorkerId: c.workerID, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_Shutdown{ + Shutdown: &worker_pb.WorkerShutdown{ + WorkerId: c.workerID, + Reason: "normal shutdown", + }, + }, + } - // Now process messages normally - c.handleIncoming() + // Close outgoing/incoming + select { + case c.outgoing <- shutdownMsg: + case <-time.After(time.Second): + glog.Warningf("Failed to send shutdown message") + } + + // Cancel stream context + if s.streamCancel != nil { + s.streamCancel() + } + + // Close stream + if s.stream != nil { + s.stream.CloseSend() + } + + // Close connection + if s.conn != nil { + s.conn.Close() + } + + // Close channels + close(c.outgoing) + close(c.incoming) + + glog.Infof("Disconnected from admin server") + cmd.resp <- nil } // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { - // Store worker info for re-registration after reconnection - c.mutex.Lock() - c.lastWorkerInfo = worker - c.mutex.Unlock() - - // If not connected, registration will happen when connection is established - if !c.connected { - glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") - return nil + respCh := make(chan error, 1) + request := registrationRequest{ + Worker: worker, + Resp: respCh, } - - return c.sendRegistration(worker) + c.cmds <- grpcCommand{ + action: ActionRegisterWorker, + data: request, + } + return <-respCh } // sendRegistration sends the registration message and waits for response @@ -468,7 +537,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { } // sendRegistrationSync sends the registration message synchronously -func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { +func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream worker_pb.WorkerService_WorkerStreamClient) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -489,7 +558,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { } // Send directly to stream to ensure it's the first message - if err := c.stream.Send(msg); err != nil { + if err := stream.Send(msg); err != nil { return fmt.Errorf("failed to send registration message: %w", err) } @@ -500,7 +569,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { // Start a goroutine to listen for the response go func() { for { - response, err := c.stream.Recv() + response, err := stream.Recv() if err != nil { errChan <- fmt.Errorf("failed to receive registration response: %w", err) return @@ -511,6 +580,8 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { return } // Continue waiting if it's not a registration response + // If stream is stuck, reconnect() will kill it, cleaning up this + // goroutine } }() @@ -535,13 +606,44 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { } } +func (c *GrpcAdminClient) IsConnected() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryConnected, + data: respCh, + } + + return <-respCh +} + +func (c *GrpcAdminClient) IsReconnecting() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryReconnecting, + data: respCh, + } + + return <-respCh +} + +func (c *GrpcAdminClient) ShouldReconnect() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryShouldReconnect, + data: respCh, + } + + return <-respCh +} + // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the heartbeat - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -587,11 +689,9 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta // RequestTask requests a new task from admin server func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just return no task - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -677,11 +777,9 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str // CompleteTaskWithMetadata reports task completion with additional metadata func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the completion report - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -726,11 +824,9 @@ func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, // UpdateTaskProgress updates task progress to admin server func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the progress update - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -765,53 +861,13 @@ func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) er } } -// IsConnected returns whether the client is connected -func (c *GrpcAdminClient) IsConnected() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.connected -} - -// IsReconnecting returns whether the client is currently attempting to reconnect -func (c *GrpcAdminClient) IsReconnecting() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.reconnecting -} - -// SetReconnectionSettings allows configuration of reconnection behavior -func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.maxReconnectAttempts = maxAttempts - c.reconnectBackoff = initialBackoff - c.maxReconnectBackoff = maxBackoff - c.reconnectMultiplier = multiplier -} - -// StopReconnection stops the reconnection loop -func (c *GrpcAdminClient) StopReconnection() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.shouldReconnect = false -} - -// StartReconnection starts the reconnection loop -func (c *GrpcAdminClient) StartReconnection() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.shouldReconnect = true -} - // waitForConnection waits for the connection to be established or timeout func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { - c.mutex.RLock() - connected := c.connected - shouldReconnect := c.shouldReconnect - c.mutex.RUnlock() + connected := c.IsConnected() + shouldReconnect := c.ShouldReconnect() if connected { return nil @@ -833,104 +889,6 @@ func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage { return c.incoming } -// MockAdminClient provides a mock implementation for testing -type MockAdminClient struct { - workerID string - connected bool - tasks []*types.TaskInput - mutex sync.RWMutex -} - -// NewMockAdminClient creates a new mock admin client -func NewMockAdminClient() *MockAdminClient { - return &MockAdminClient{ - connected: true, - tasks: make([]*types.TaskInput, 0), - } -} - -// Connect mock implementation -func (m *MockAdminClient) Connect() error { - m.mutex.Lock() - defer m.mutex.Unlock() - m.connected = true - return nil -} - -// Disconnect mock implementation -func (m *MockAdminClient) Disconnect() error { - m.mutex.Lock() - defer m.mutex.Unlock() - m.connected = false - return nil -} - -// RegisterWorker mock implementation -func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error { - m.workerID = worker.ID - glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities) - return nil -} - -// SendHeartbeat mock implementation -func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { - glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d", - workerID, status.Status, status.CurrentLoad, status.MaxConcurrent) - return nil -} - -// RequestTask mock implementation -func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { - m.mutex.Lock() - defer m.mutex.Unlock() - - if len(m.tasks) > 0 { - task := m.tasks[0] - m.tasks = m.tasks[1:] - glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID) - return task, nil - } - - // No tasks available - return nil, nil -} - -// CompleteTask mock implementation -func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { - if success { - glog.Infof("Mock: Task %s completed successfully", taskID) - } else { - glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg) - } - return nil -} - -// UpdateTaskProgress mock implementation -func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error { - glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress) - return nil -} - -// CompleteTaskWithMetadata mock implementation -func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { - glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata) - return nil -} - -// IsConnected mock implementation -func (m *MockAdminClient) IsConnected() bool { - m.mutex.RLock() - defer m.mutex.RUnlock() - return m.connected -} - -// AddMockTask adds a mock task for testing -func (m *MockAdminClient) AddMockTask(task *types.TaskInput) { - m.mutex.Lock() - defer m.mutex.Unlock() - m.tasks = append(m.tasks, task) -} - // CreateAdminClient creates an admin client with the provided dial option func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) { return NewGrpcAdminClient(adminServer, workerID, dialOption), nil diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 0763fdc2e..afc203318 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -23,20 +23,56 @@ import ( // Worker represents a maintenance worker instance type Worker struct { - id string - config *types.WorkerConfig - registry *tasks.TaskRegistry - currentTasks map[string]*types.TaskInput - adminClient AdminClient + id string + config *types.WorkerConfig + registry *tasks.TaskRegistry + cmds chan workerCommand + state *workerState + taskLogHandler *tasks.TaskLogHandler + closeOnce sync.Once +} +type workerState struct { running bool - stopChan chan struct{} - mutex sync.RWMutex + adminClient AdminClient startTime time.Time - tasksCompleted int - tasksFailed int + stopChan chan struct{} heartbeatTicker *time.Ticker requestTicker *time.Ticker - taskLogHandler *tasks.TaskLogHandler + currentTasks map[string]*types.TaskInput + tasksCompleted int + tasksFailed int +} + +type workerAction string + +const ( + ActionStart workerAction = "start" + ActionStop workerAction = "stop" + ActionGetStatus workerAction = "getstatus" + ActionGetTaskLoad workerAction = "getload" + ActionSetTask workerAction = "settask" + ActionSetAdmin workerAction = "setadmin" + ActionRemoveTask workerAction = "removetask" + ActionGetAdmin workerAction = "getadmin" + ActionIncTaskFail workerAction = "inctaskfail" + ActionIncTaskComplete workerAction = "inctaskcomplete" + ActionGetHbTick workerAction = "gethbtick" + ActionGetReqTick workerAction = "getreqtick" + ActionGetStopChan workerAction = "getstopchan" + ActionSetHbTick workerAction = "sethbtick" + ActionSetReqTick workerAction = "setreqtick" + ActionGetStartTime workerAction = "getstarttime" + ActionGetCompletedTasks workerAction = "getcompletedtasks" + ActionGetFailedTasks workerAction = "getfailedtasks" + ActionCancelTask workerAction = "canceltask" + // ... other worker actions like Stop, Status, etc. +) + +type statusResponse chan types.WorkerStatus +type workerCommand struct { + action workerAction + data any + resp chan error // for reporting success/failure } // AdminClient defines the interface for communicating with the admin server @@ -150,17 +186,222 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { id: workerID, config: config, registry: registry, - currentTasks: make(map[string]*types.TaskInput), - stopChan: make(chan struct{}), - startTime: time.Now(), taskLogHandler: taskLogHandler, + cmds: make(chan workerCommand), } glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll())) - + go worker.managerLoop() return worker, nil } +func (w *Worker) managerLoop() { + w.state = &workerState{ + startTime: time.Now(), + stopChan: make(chan struct{}), + currentTasks: make(map[string]*types.TaskInput), + } + + for cmd := range w.cmds { + switch cmd.action { + case ActionStart: + w.handleStart(cmd) + case ActionStop: + w.handleStop(cmd) + case ActionGetStatus: + respCh := cmd.data.(statusResponse) + var currentTasks []types.TaskInput + for _, task := range w.state.currentTasks { + currentTasks = append(currentTasks, *task) + } + + statusStr := "active" + if len(w.state.currentTasks) >= w.config.MaxConcurrent { + statusStr = "busy" + } + + status := types.WorkerStatus{ + WorkerID: w.id, + Status: statusStr, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + CurrentLoad: len(w.state.currentTasks), + LastHeartbeat: time.Now(), + CurrentTasks: currentTasks, + Uptime: time.Since(w.state.startTime), + TasksCompleted: w.state.tasksCompleted, + TasksFailed: w.state.tasksFailed, + } + respCh <- status + case ActionGetTaskLoad: + respCh := cmd.data.(chan int) + respCh <- len(w.state.currentTasks) + case ActionSetTask: + currentLoad := len(w.state.currentTasks) + if currentLoad >= w.config.MaxConcurrent { + cmd.resp <- fmt.Errorf("worker is at capacity") + } + task := cmd.data.(*types.TaskInput) + w.state.currentTasks[task.ID] = task + cmd.resp <- nil + case ActionSetAdmin: + admin := cmd.data.(AdminClient) + w.state.adminClient = admin + case ActionRemoveTask: + taskID := cmd.data.(string) + delete(w.state.currentTasks, taskID) + case ActionGetAdmin: + respCh := cmd.data.(chan AdminClient) + respCh <- w.state.adminClient + case ActionIncTaskFail: + w.state.tasksFailed++ + case ActionIncTaskComplete: + w.state.tasksCompleted++ + case ActionGetHbTick: + respCh := cmd.data.(chan *time.Ticker) + respCh <- w.state.heartbeatTicker + case ActionGetReqTick: + respCh := cmd.data.(chan *time.Ticker) + respCh <- w.state.requestTicker + case ActionSetHbTick: + w.state.heartbeatTicker = cmd.data.(*time.Ticker) + case ActionSetReqTick: + w.state.requestTicker = cmd.data.(*time.Ticker) + case ActionGetStopChan: + cmd.data.(chan chan struct{}) <- w.state.stopChan + case ActionGetStartTime: + cmd.data.(chan time.Time) <- w.state.startTime + case ActionGetCompletedTasks: + cmd.data.(chan int) <- w.state.tasksCompleted + case ActionGetFailedTasks: + cmd.data.(chan int) <- w.state.tasksFailed + case ActionCancelTask: + taskID := cmd.data.(string) + if task, exists := w.state.currentTasks[taskID]; exists { + glog.Infof("Cancelling task %s", task.ID) + // TODO: Implement actual task cancellation logic + } else { + glog.Warningf("Cannot cancel task %s: task not found", taskID) + } + + } + } +} + +func (w *Worker) getTaskLoad() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetTaskLoad, + data: respCh, + resp: nil, + } + return <-respCh +} + +func (w *Worker) setTask(task *types.TaskInput) error { + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionSetTask, + data: task, + resp: resp, + } + if err := <-resp; err != nil { + glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", + w.id, w.getTaskLoad(), w.config.MaxConcurrent, task.ID) + return err + } + newLoad := w.getTaskLoad() + + glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", + w.id, task.ID, newLoad, w.config.MaxConcurrent) + return nil +} + +func (w *Worker) removeTask(task *types.TaskInput) int { + w.cmds <- workerCommand{ + action: ActionRemoveTask, + data: task.ID, + } + return w.getTaskLoad() +} + +func (w *Worker) getAdmin() AdminClient { + respCh := make(chan AdminClient, 1) + w.cmds <- workerCommand{ + action: ActionGetAdmin, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getStopChan() chan struct{} { + respCh := make(chan chan struct{}, 1) + w.cmds <- workerCommand{ + action: ActionGetStopChan, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getHbTick() *time.Ticker { + respCh := make(chan *time.Ticker, 1) + w.cmds <- workerCommand{ + action: ActionGetHbTick, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getReqTick() *time.Ticker { + respCh := make(chan *time.Ticker, 1) + w.cmds <- workerCommand{ + action: ActionGetReqTick, + data: respCh, + } + return <-respCh +} + +func (w *Worker) setHbTick(tick *time.Ticker) *time.Ticker { + w.cmds <- workerCommand{ + action: ActionSetHbTick, + data: tick, + } + return w.getHbTick() +} + +func (w *Worker) setReqTick(tick *time.Ticker) *time.Ticker { + w.cmds <- workerCommand{ + action: ActionSetReqTick, + data: tick, + } + return w.getReqTick() +} + +func (w *Worker) getStartTime() time.Time { + respCh := make(chan time.Time, 1) + w.cmds <- workerCommand{ + action: ActionGetStartTime, + data: respCh, + } + return <-respCh +} +func (w *Worker) getCompletedTasks() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetCompletedTasks, + data: respCh, + } + return <-respCh +} +func (w *Worker) getFailedTasks() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetFailedTasks, + data: respCh, + } + return <-respCh +} + // getTaskLoggerConfig returns the task logger configuration with worker's log directory func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig { config := tasks.DefaultTaskLoggerConfig() @@ -177,21 +418,29 @@ func (w *Worker) ID() string { return w.id } -// Start starts the worker func (w *Worker) Start() error { - w.mutex.Lock() - defer w.mutex.Unlock() + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionStart, + resp: resp, + } + return <-resp +} - if w.running { - return fmt.Errorf("worker is already running") +// Start starts the worker +func (w *Worker) handleStart(cmd workerCommand) { + if w.state.running { + cmd.resp <- fmt.Errorf("worker is already running") + return } - if w.adminClient == nil { - return fmt.Errorf("admin client is not set") + if w.state.adminClient == nil { + cmd.resp <- fmt.Errorf("admin client is not set") + return } - w.running = true - w.startTime = time.Now() + w.state.running = true + w.state.startTime = time.Now() // Prepare worker info for registration workerInfo := &types.WorkerData{ @@ -204,7 +453,7 @@ func (w *Worker) Start() error { } // Register worker info with client first (this stores it for use during connection) - if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + if err := w.state.adminClient.RegisterWorker(workerInfo); err != nil { glog.V(1).Infof("Worker info stored for registration: %v", err) // This is expected if not connected yet } @@ -214,7 +463,7 @@ func (w *Worker) Start() error { w.id, w.config.Capabilities, w.config.MaxConcurrent) // Try initial connection, but don't fail if it doesn't work immediately - if err := w.adminClient.Connect(); err != nil { + if err := w.state.adminClient.Connect(); err != nil { glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) // Don't return error - let the reconnection loop handle it } else { @@ -230,54 +479,67 @@ func (w *Worker) Start() error { go w.messageProcessingLoop() glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) - return nil + cmd.resp <- nil } -// Stop stops the worker func (w *Worker) Stop() error { - w.mutex.Lock() - defer w.mutex.Unlock() - - if !w.running { - return nil - } - - w.running = false - close(w.stopChan) - - // Stop tickers - if w.heartbeatTicker != nil { - w.heartbeatTicker.Stop() + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionStop, + resp: resp, } - if w.requestTicker != nil { - w.requestTicker.Stop() + if err := <-resp; err != nil { + return err } - // Wait for current tasks to complete or timeout + // Wait for tasks to finish timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() - - for len(w.currentTasks) > 0 { + for w.getTaskLoad() > 0 { select { case <-timeout.C: - glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks)) - break - case <-time.After(time.Second): - // Check again + glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad()) + goto end_wait + case <-time.After(100 * time.Millisecond): } } +end_wait: // Disconnect from admin server - if w.adminClient != nil { - if err := w.adminClient.Disconnect(); err != nil { + if adminClient := w.getAdmin(); adminClient != nil { + if err := adminClient.Disconnect(); err != nil { glog.Errorf("Error disconnecting from admin server: %v", err) } } + w.closeOnce.Do(func() { + close(w.cmds) + }) glog.Infof("Worker %s stopped", w.id) return nil } +// Stop stops the worker +func (w *Worker) handleStop(cmd workerCommand) { + if !w.state.running { + cmd.resp <- nil + return + } + + w.state.running = false + close(w.state.stopChan) + + // Stop tickers + if w.state.heartbeatTicker != nil { + w.state.heartbeatTicker.Stop() + } + if w.state.requestTicker != nil { + w.state.requestTicker.Stop() + } + + cmd.resp <- nil +} + // RegisterTask registers a task factory func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) { w.registry.Register(taskType, factory) @@ -290,31 +552,13 @@ func (w *Worker) GetCapabilities() []types.TaskType { // GetStatus returns the current worker status func (w *Worker) GetStatus() types.WorkerStatus { - w.mutex.RLock() - defer w.mutex.RUnlock() - - var currentTasks []types.TaskInput - for _, task := range w.currentTasks { - currentTasks = append(currentTasks, *task) - } - - status := "active" - if len(w.currentTasks) >= w.config.MaxConcurrent { - status = "busy" - } - - return types.WorkerStatus{ - WorkerID: w.id, - Status: status, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), - LastHeartbeat: time.Now(), - CurrentTasks: currentTasks, - Uptime: time.Since(w.startTime), - TasksCompleted: w.tasksCompleted, - TasksFailed: w.tasksFailed, + respCh := make(statusResponse, 1) + w.cmds <- workerCommand{ + action: ActionGetStatus, + data: respCh, + resp: nil, } + return <-respCh } // HandleTask handles a task execution @@ -322,22 +566,10 @@ func (w *Worker) HandleTask(task *types.TaskInput) error { glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)", w.id, task.ID, task.Type, task.VolumeID) - w.mutex.Lock() - currentLoad := len(w.currentTasks) - if currentLoad >= w.config.MaxConcurrent { - w.mutex.Unlock() - glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", - w.id, currentLoad, w.config.MaxConcurrent, task.ID) - return fmt.Errorf("worker is at capacity") + if err := w.setTask(task); err != nil { + return err } - w.currentTasks[task.ID] = task - newLoad := len(w.currentTasks) - w.mutex.Unlock() - - glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", - w.id, task.ID, newLoad, w.config.MaxConcurrent) - // Execute task in goroutine go w.executeTask(task) @@ -366,7 +598,10 @@ func (w *Worker) SetTaskRequestInterval(interval time.Duration) { // SetAdminClient sets the admin client func (w *Worker) SetAdminClient(client AdminClient) { - w.adminClient = client + w.cmds <- workerCommand{ + action: ActionSetAdmin, + data: client, + } } // executeTask executes a task @@ -374,10 +609,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { startTime := time.Now() defer func() { - w.mutex.Lock() - delete(w.currentTasks, task.ID) - currentLoad := len(w.currentTasks) - w.mutex.Unlock() + currentLoad := w.removeTask(task) duration := time.Since(startTime) glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", @@ -388,7 +620,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339)) // Report task start to admin server - if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil { + if err := w.getAdmin().UpdateTaskProgress(task.ID, 0.0); err != nil { glog.V(1).Infof("Failed to report task start to admin: %v", err) } @@ -461,7 +693,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { taskInstance.SetProgressCallback(func(progress float64, stage string) { // Report progress updates to admin server glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage) - if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil { + if err := w.getAdmin().UpdateTaskProgress(task.ID, progress); err != nil { glog.V(1).Infof("Failed to report task progress to admin: %v", err) } if fileLogger != nil { @@ -481,7 +713,9 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Report completion if err != nil { w.completeTask(task.ID, false, err.Error()) - w.tasksFailed++ + w.cmds <- workerCommand{ + action: ActionIncTaskFail, + } glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) if fileLogger != nil { fileLogger.LogStatus("failed", err.Error()) @@ -489,18 +723,21 @@ func (w *Worker) executeTask(task *types.TaskInput) { } } else { w.completeTask(task.ID, true, "") - w.tasksCompleted++ + w.cmds <- workerCommand{ + action: ActionIncTaskComplete, + } glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) if fileLogger != nil { fileLogger.Info("Task %s completed successfully", task.ID) } } + return } // completeTask reports task completion to admin server func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { - if w.adminClient != nil { - if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil { + if w.getAdmin() != nil { + if err := w.getAdmin().CompleteTask(taskID, success, errorMsg); err != nil { glog.Errorf("Failed to report task completion: %v", err) } } @@ -508,14 +745,14 @@ func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { // heartbeatLoop sends periodic heartbeats to the admin server func (w *Worker) heartbeatLoop() { - w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval) - defer w.heartbeatTicker.Stop() - + defer w.setHbTick(time.NewTicker(w.config.HeartbeatInterval)).Stop() + ticker := w.getHbTick() + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: return - case <-w.heartbeatTicker.C: + case <-ticker.C: w.sendHeartbeat() } } @@ -523,14 +760,14 @@ func (w *Worker) heartbeatLoop() { // taskRequestLoop periodically requests new tasks from the admin server func (w *Worker) taskRequestLoop() { - w.requestTicker = time.NewTicker(w.config.TaskRequestInterval) - defer w.requestTicker.Stop() - + defer w.setReqTick(time.NewTicker(w.config.TaskRequestInterval)).Stop() + ticker := w.getReqTick() + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: return - case <-w.requestTicker.C: + case <-ticker.C: w.requestTasks() } } @@ -538,13 +775,13 @@ func (w *Worker) taskRequestLoop() { // sendHeartbeat sends heartbeat to admin server func (w *Worker) sendHeartbeat() { - if w.adminClient != nil { - if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ + if w.getAdmin() != nil { + if err := w.getAdmin().SendHeartbeat(w.id, &types.WorkerStatus{ WorkerID: w.id, Status: "active", Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), + CurrentLoad: w.getTaskLoad(), LastHeartbeat: time.Now(), }); err != nil { glog.Warningf("Failed to send heartbeat: %v", err) @@ -554,9 +791,7 @@ func (w *Worker) sendHeartbeat() { // requestTasks requests new tasks from the admin server func (w *Worker) requestTasks() { - w.mutex.RLock() - currentLoad := len(w.currentTasks) - w.mutex.RUnlock() + currentLoad := w.getTaskLoad() if currentLoad >= w.config.MaxConcurrent { glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)", @@ -564,11 +799,11 @@ func (w *Worker) requestTasks() { return // Already at capacity } - if w.adminClient != nil { + if w.getAdmin() != nil { glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)", w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities) - task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities) + task, err := w.getAdmin().RequestTask(w.id, w.config.Capabilities) if err != nil { glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err) return @@ -591,18 +826,6 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry { return w.registry } -// GetCurrentTasks returns the current tasks -func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput { - w.mutex.RLock() - defer w.mutex.RUnlock() - - tasks := make(map[string]*types.TaskInput) - for id, task := range w.currentTasks { - tasks[id] = task - } - return tasks -} - // registerWorker registers the worker with the admin server func (w *Worker) registerWorker() { workerInfo := &types.WorkerData{ @@ -614,7 +837,7 @@ func (w *Worker) registerWorker() { LastHeartbeat: time.Now(), } - if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + if err := w.getAdmin().RegisterWorker(workerInfo); err != nil { glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) } else { glog.Infof("Worker %s registered successfully with admin server", w.id) @@ -627,15 +850,15 @@ func (w *Worker) connectionMonitorLoop() { defer ticker.Stop() lastConnectionStatus := false - + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) return case <-ticker.C: // Monitor connection status and log changes - currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected() + currentConnectionStatus := w.getAdmin() != nil && w.getAdmin().IsConnected() if currentConnectionStatus != lastConnectionStatus { if currentConnectionStatus { @@ -662,19 +885,17 @@ func (w *Worker) GetConfig() *types.WorkerConfig { // GetPerformanceMetrics returns performance metrics func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { - w.mutex.RLock() - defer w.mutex.RUnlock() - uptime := time.Since(w.startTime) + uptime := time.Since(w.getStartTime()) var successRate float64 - totalTasks := w.tasksCompleted + w.tasksFailed + totalTasks := w.getCompletedTasks() + w.getFailedTasks() if totalTasks > 0 { - successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100 + successRate = float64(w.getCompletedTasks()) / float64(totalTasks) * 100 } return &types.WorkerPerformance{ - TasksCompleted: w.tasksCompleted, - TasksFailed: w.tasksFailed, + TasksCompleted: w.getCompletedTasks(), + TasksFailed: w.getFailedTasks(), AverageTaskTime: 0, // Would need to track this Uptime: uptime, SuccessRate: successRate, @@ -686,7 +907,7 @@ func (w *Worker) messageProcessingLoop() { glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id) // Get access to the incoming message channel from gRPC client - grpcClient, ok := w.adminClient.(*GrpcAdminClient) + grpcClient, ok := w.getAdmin().(*GrpcAdminClient) if !ok { glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id) return @@ -694,10 +915,10 @@ func (w *Worker) messageProcessingLoop() { incomingChan := grpcClient.GetIncomingChannel() glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id) - + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id) return case message := <-incomingChan: @@ -773,7 +994,7 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { }, } - grpcClient, ok := w.adminClient.(*GrpcAdminClient) + grpcClient, ok := w.getAdmin().(*GrpcAdminClient) if !ok { glog.Errorf("Cannot send task log response: admin client is not gRPC client") return @@ -791,14 +1012,10 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) { glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId) - w.mutex.Lock() - defer w.mutex.Unlock() - - if task, exists := w.currentTasks[cancellation.TaskId]; exists { - // TODO: Implement task cancellation logic - glog.Infof("Cancelling task %s", task.ID) - } else { - glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId) + w.cmds <- workerCommand{ + action: ActionCancelTask, + data: cancellation.TaskId, + resp: nil, } }