From 679eaaf8f633d2827746fbac3c7b1d330daaa5bb Mon Sep 17 00:00:00 2001 From: Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> Date: Mon, 27 Oct 2025 20:43:50 +0200 Subject: [PATCH 1/4] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(worker):=20fo?= =?UTF-8?q?llow=20CSP=20model?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- weed/command/worker.go | 83 +----- weed/worker/client.go | 634 ++++++++++++----------------------------- weed/worker/worker.go | 632 +++++++++++++++------------------------- 3 files changed, 416 insertions(+), 933 deletions(-) diff --git a/weed/command/worker.go b/weed/command/worker.go index 6e592f73f..6d5a01a3e 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -1,9 +1,9 @@ package command import ( + "fmt" "os" "os/signal" - "path/filepath" "strings" "syscall" "time" @@ -69,43 +69,6 @@ func runWorker(cmd *Command, args []string) bool { glog.Fatalf("No valid capabilities specified") return false } - - // Set working directory and create task-specific subdirectories - var baseWorkingDir string - if *workerWorkingDir != "" { - glog.Infof("Setting working directory to: %s", *workerWorkingDir) - if err := os.Chdir(*workerWorkingDir); err != nil { - glog.Fatalf("Failed to change working directory: %v", err) - return false - } - wd, err := os.Getwd() - if err != nil { - glog.Fatalf("Failed to get working directory: %v", err) - return false - } - baseWorkingDir = wd - glog.Infof("Current working directory: %s", baseWorkingDir) - } else { - // Use default working directory when not specified - wd, err := os.Getwd() - if err != nil { - glog.Fatalf("Failed to get current working directory: %v", err) - return false - } - baseWorkingDir = wd - glog.Infof("Using current working directory: %s", baseWorkingDir) - } - - // Create task-specific subdirectories - for _, capability := range capabilities { - taskDir := filepath.Join(baseWorkingDir, string(capability)) - if err := os.MkdirAll(taskDir, 0755); err != nil { - glog.Fatalf("Failed to create task directory %s: %v", taskDir, err) - return false - } - glog.Infof("Created task directory: %s", taskDir) - } - // Create gRPC dial option using TLS configuration grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") @@ -116,45 +79,30 @@ func runWorker(cmd *Command, args []string) bool { MaxConcurrent: *workerMaxConcurrent, HeartbeatInterval: *workerHeartbeatInterval, TaskRequestInterval: *workerTaskRequestInterval, - BaseWorkingDir: baseWorkingDir, + BaseWorkingDir: *workerWorkingDir, GrpcDialOption: grpcDialOption, } - // Create worker instance - workerInstance, err := worker.NewWorker(config) - if err != nil { - glog.Fatalf("Failed to create worker: %v", err) - return false - } - adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption) - if err != nil { - glog.Fatalf("Failed to create admin client: %v", err) + if err := RunWorkerFromConfig(config); err != nil { + glog.Fatalf("Worker failed to run: %v", err) return false } - // Set admin client - workerInstance.SetAdminClient(adminClient) + glog.Infof("Worker stopped gracefully.") + return true +} - // Set working directory - if *workerWorkingDir != "" { - glog.Infof("Setting working directory to: %s", *workerWorkingDir) - if err := os.Chdir(*workerWorkingDir); err != nil { - glog.Fatalf("Failed to change working directory: %v", err) - return false - } - wd, err := os.Getwd() - if err != nil { - glog.Fatalf("Failed to get working directory: %v", err) - return false - } - glog.Infof("Current working directory: %s", wd) +func RunWorkerFromConfig(config *types.WorkerConfig) error { + // Create worker instance + workerInstance, err := worker.NewWorkerWithDefaults(config) + if err != nil { + return fmt.Errorf("Failed to create worker: %v", err) } // Start the worker err = workerInstance.Start() if err != nil { - glog.Errorf("Failed to start worker: %v", err) - return false + return fmt.Errorf("Failed to start worker: %v", err) } // Set up signal handling @@ -171,11 +119,10 @@ func runWorker(cmd *Command, args []string) bool { // Gracefully stop the worker err = workerInstance.Stop() if err != nil { - glog.Errorf("Error stopping worker: %v", err) + return fmt.Errorf("Error stopping worker: %v", err) } glog.Infof("Worker stopped") - - return true + return nil } // parseCapabilities converts comma-separated capability string to task types diff --git a/weed/worker/client.go b/weed/worker/client.go index 613d69987..0e1e11270 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -2,7 +2,6 @@ package worker import ( "context" - "errors" "fmt" "io" "time" @@ -14,17 +13,13 @@ import ( "google.golang.org/grpc" ) -var ( - ErrAlreadyConnected = errors.New("already connected") -) - // GrpcAdminClient implements AdminClient using gRPC bidirectional streaming type GrpcAdminClient struct { adminAddress string workerID string dialOption grpc.DialOption - cmds chan grpcCommand + comms clientChannels // Reconnection parameters maxReconnectAttempts int @@ -33,47 +28,25 @@ type GrpcAdminClient struct { reconnectMultiplier float64 // Channels for communication - outgoing chan *worker_pb.WorkerMessage - incoming chan *worker_pb.AdminMessage - responseChans map[string]chan *worker_pb.AdminMessage -} + outgoing chan *worker_pb.WorkerMessage + incoming chan *worker_pb.AdminMessage -type grpcAction string - -const ( - ActionConnect grpcAction = "connect" - ActionDisconnect grpcAction = "disconnect" - ActionReconnect grpcAction = "reconnect" - ActionStreamError grpcAction = "stream_error" - ActionRegisterWorker grpcAction = "register_worker" - ActionQueryReconnecting grpcAction = "query_reconnecting" - ActionQueryConnected grpcAction = "query_connected" - ActionQueryShouldReconnect grpcAction = "query_shouldreconnect" -) - -type registrationRequest struct { - Worker *types.WorkerData - Resp chan error // Used to send the registration result back + state grpcState +} +type connectionEvent struct { + connected bool + err error } -type grpcCommand struct { - action grpcAction - data any - resp chan error // for reporting success/failure +type clientChannels struct { + stop chan struct{} + connectionEvents chan connectionEvent + streamErrors chan error } type grpcState struct { - connected bool - reconnecting bool - shouldReconnect bool - conn *grpc.ClientConn - client worker_pb.WorkerServiceClient - stream worker_pb.WorkerService_WorkerStreamClient - streamCtx context.Context - streamCancel context.CancelFunc - lastWorkerInfo *types.WorkerData - reconnectStop chan struct{} - streamExit chan struct{} + started bool + lastWorkerInfo *types.WorkerData } // NewGrpcAdminClient creates a new gRPC admin client @@ -91,88 +64,87 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di reconnectMultiplier: 1.5, outgoing: make(chan *worker_pb.WorkerMessage, 100), incoming: make(chan *worker_pb.AdminMessage, 100), - responseChans: make(map[string]chan *worker_pb.AdminMessage), - cmds: make(chan grpcCommand), + state: grpcState{started: false}, } - go c.managerLoop() return c } -func (c *GrpcAdminClient) managerLoop() { - state := &grpcState{shouldReconnect: true} - -out: - for cmd := range c.cmds { - switch cmd.action { - case ActionConnect: - c.handleConnect(cmd, state) - case ActionDisconnect: - c.handleDisconnect(cmd, state) - break out - case ActionReconnect: - if state.connected || state.reconnecting || !state.shouldReconnect { - cmd.resp <- ErrAlreadyConnected - continue - } - state.reconnecting = true // Manager acknowledges the attempt - err := c.reconnect(state) - state.reconnecting = false - cmd.resp <- err - case ActionStreamError: - state.connected = false - case ActionRegisterWorker: - req := cmd.data.(registrationRequest) - state.lastWorkerInfo = req.Worker - if !state.connected { - glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") - // Respond immediately with success (registration will happen later) - req.Resp <- nil - continue - } - err := c.sendRegistration(req.Worker) - req.Resp <- err - case ActionQueryConnected: - respCh := cmd.data.(chan bool) - respCh <- state.connected - case ActionQueryReconnecting: - respCh := cmd.data.(chan bool) - respCh <- state.reconnecting - case ActionQueryShouldReconnect: - respCh := cmd.data.(chan bool) - respCh <- state.shouldReconnect - } +// Connect establishes gRPC connection to admin server with TLS detection +func (c *GrpcAdminClient) Connect(workerInfo *types.WorkerData) error { + if c.state.started { + return fmt.Errorf("already started") + + } + // Register worker info with client first (this stores it for use during connection) + if err := c.RegisterWorker(workerInfo); err != nil { + glog.V(1).Infof("Worker info stored for registration: %v", err) + // This is expected if not connected yet } -} -// Connect establishes gRPC connection to admin server with TLS detection -func (c *GrpcAdminClient) Connect() error { - resp := make(chan error) - c.cmds <- grpcCommand{ - action: ActionConnect, - resp: resp, + c.state.started = true + c.comms.stop = make(chan struct{}) + c.comms.connectionEvents = make(chan connectionEvent) + c.comms.streamErrors = make(chan error, 2) + go c.connectionProcess() + + // Attempt the initial connection + event := <-c.comms.connectionEvents + if event.err != nil { + glog.V(1).Infof("Initial connection failed, will retry: %v", event.err) + return event.err + } else { + return nil } - return <-resp } -func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) { - if s.connected { - cmd.resp <- fmt.Errorf("already connected") - return - } +func (c *GrpcAdminClient) GetEvents() chan connectionEvent { + return c.comms.connectionEvents +} - // Start reconnection loop immediately (async) - stop := make(chan struct{}) - s.reconnectStop = stop - go c.reconnectionLoop(stop) +func (c *GrpcAdminClient) connectionProcess() { + var ( + conn *grpc.ClientConn + ) - // Attempt the initial connection - err := c.attemptConnection(s) + // Initial connection attempt + conn, err := c.tryConnect() if err != nil { - glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) - cmd.resp <- err - return + glog.Warningf("Initial connection failed: %v", err) + c.comms.connectionEvents <- connectionEvent{connected: false, err: err} + } else { + c.comms.connectionEvents <- connectionEvent{connected: true} } - cmd.resp <- nil + + for { + select { + case <-c.comms.stop: + c.comms.connectionEvents <- connectionEvent{connected: false} + if conn != nil { + <-c.comms.streamErrors + <-c.comms.streamErrors + conn.Close() + } + return + case err := <-c.comms.streamErrors: + <-c.comms.streamErrors // now both incomingProcess and outgoingProcess + // have been cleaned up + glog.Warningf("Stream error: %v, reconnecting...", err) + if conn != nil { + conn.Close() + conn = nil + } + c.comms.connectionEvents <- connectionEvent{connected: false, err: err} + conn, err = c.tryConnectWithBackoff() + if err != nil { + glog.Errorf("Reconnection failed: %v", err) + } else { + c.comms.connectionEvents <- connectionEvent{connected: true} + } + + } + + } + } // createConnection attempts to connect using the provided dial option @@ -185,297 +157,160 @@ func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { return nil, fmt.Errorf("failed to connect to admin server: %w", err) } - glog.Infof("Connected to admin server at %s", c.adminAddress) return conn, nil } -// attemptConnection tries to establish the connection without managing the reconnection loop -func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { - // Detect TLS support and create appropriate connection +func (c *GrpcAdminClient) tryConnect() (*grpc.ClientConn, error) { + glog.Infof("Connecting to admin server at %s", c.adminAddress) + conn, err := c.createConnection() if err != nil { - return fmt.Errorf("failed to connect to admin server: %w", err) + return nil, fmt.Errorf("connection failed: %w", err) } - s.conn = conn - s.client = worker_pb.NewWorkerServiceClient(conn) - - // Create bidirectional stream - s.streamCtx, s.streamCancel = context.WithCancel(context.Background()) - stream, err := s.client.WorkerStream(s.streamCtx) - glog.Infof("Worker stream created") + stream, err := worker_pb.NewWorkerServiceClient(conn).WorkerStream(context.Background()) if err != nil { - s.conn.Close() - return fmt.Errorf("failed to create worker stream: %w", err) - } - s.connected = true - s.stream = stream - - // Always check for worker info and send registration immediately as the very first message - if s.lastWorkerInfo != nil { - // Send registration synchronously as the very first message - if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil { - s.conn.Close() - s.connected = false - return fmt.Errorf("failed to register worker: %w", err) + conn.Close() + return nil, fmt.Errorf("stream creation failed: %w", err) + } + + if c.state.lastWorkerInfo != nil { + if err := c.sendRegistrationSync(c.state.lastWorkerInfo, stream); err != nil { + conn.Close() + return nil, fmt.Errorf("registration failed: %w", err) } - glog.Infof("Worker registered successfully with admin server") - } else { - // No worker info yet - stream will wait for registration - glog.V(1).Infof("Connected to admin server, waiting for worker registration info") + glog.Infof("Worker registered successfully") } // Start stream handlers - s.streamExit = make(chan struct{}) - go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) - go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) + go c.outgoingProcess(stream) + go c.incomingProcess(stream) glog.Infof("Connected to admin server at %s", c.adminAddress) - return nil -} - -// reconnect attempts to re-establish the connection -func (c *GrpcAdminClient) reconnect(s *grpcState) error { - // Clean up existing connection completely - if s.streamCancel != nil { - s.streamCancel() - } - if s.conn != nil { - s.conn.Close() - } - s.connected = false - - // Attempt to re-establish connection using the same logic as initial connection - if err := c.attemptConnection(s); err != nil { - return fmt.Errorf("failed to reconnect: %w", err) - } - - // Registration is now handled in attemptConnection if worker info is available - return nil + return conn, nil } -// reconnectionLoop handles automatic reconnection with exponential backoff -func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { - backoff := c.reconnectBackoff +func (c *GrpcAdminClient) tryConnectWithBackoff() (*grpc.ClientConn, error) { + backoff := time.Second attempts := 0 - for { - waitDuration := backoff - if attempts == 0 { - waitDuration = time.Second + if conn, err := c.tryConnect(); err == nil { + return conn, nil } - select { - case <-reconnectStop: - return - case <-time.After(waitDuration): + + attempts++ + if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts { + return nil, fmt.Errorf("max reconnection attempts reached") } - resp := make(chan error, 1) - c.cmds <- grpcCommand{ - action: ActionReconnect, - resp: resp, + + // Exponential backoff + backoff = time.Duration(float64(backoff) * c.reconnectMultiplier) + if backoff > c.maxReconnectBackoff { + backoff = c.maxReconnectBackoff } - err := <-resp - if err == nil { - // Successful reconnection - attempts = 0 - backoff = c.reconnectBackoff - glog.Infof("Successfully reconnected to admin server") - } else if errors.Is(err, ErrAlreadyConnected) { - attempts = 0 - backoff = c.reconnectBackoff - } else { - attempts++ - glog.Errorf("Reconnection attempt %d failed: %v", attempts, err) - - // Check if we should give up - if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts { - glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts) - return - } - // Increase backoff - backoff = time.Duration(float64(backoff) * c.reconnectMultiplier) - if backoff > c.maxReconnectBackoff { - backoff = c.maxReconnectBackoff - } - glog.Infof("Waiting %v before next reconnection attempt", backoff) + glog.Infof("Reconnection failed, retrying in %v", backoff) + select { + case <-c.comms.stop: + return nil, fmt.Errorf("cancelled") + case <-time.After(backoff): } } } // handleOutgoing processes outgoing messages to admin -func handleOutgoing( - stream worker_pb.WorkerService_WorkerStreamClient, - streamExit <-chan struct{}, - outgoing <-chan *worker_pb.WorkerMessage, - cmds chan<- grpcCommand) { - - msgCh := make(chan *worker_pb.WorkerMessage) - errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy - // Goroutine to handle blocking stream.Recv() and simultaneously handle exit - // signals - go func() { - for msg := range msgCh { - if err := stream.Send(msg); err != nil { - errCh <- err - return // Exit the receiver goroutine on error/EOF - } - } - close(errCh) - }() +func (c *GrpcAdminClient) outgoingProcess(stream worker_pb.WorkerService_WorkerStreamClient) { - for msg := range outgoing { + for { select { - case msgCh <- msg: - case err := <-errCh: - glog.Errorf("Failed to send message to admin: %v", err) - cmds <- grpcCommand{action: ActionStreamError, data: err} - return - case <-streamExit: - close(msgCh) - <-errCh + case <-c.comms.stop: + // Send shutdown message + shutdownMsg := &worker_pb.WorkerMessage{ + WorkerId: c.workerID, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_Shutdown{ + Shutdown: &worker_pb.WorkerShutdown{ + WorkerId: c.workerID, + Reason: "normal shutdown", + }, + }, + } + stream.Send(shutdownMsg) + close(c.outgoing) + c.comms.streamErrors <- nil return + case msg := <-c.outgoing: + if err := stream.Send(msg); err != nil { + glog.Errorf("Failed to send message: %v", err) + c.comms.streamErrors <- err + return + } } } } // handleIncoming processes incoming messages from admin -func handleIncoming( - workerID string, - stream worker_pb.WorkerService_WorkerStreamClient, - streamExit <-chan struct{}, - incoming chan<- *worker_pb.AdminMessage, - cmds chan<- grpcCommand) { +func (c *GrpcAdminClient) incomingProcess(stream worker_pb.WorkerService_WorkerStreamClient) { + workerID := c.state.lastWorkerInfo.ID glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) - msgCh := make(chan *worker_pb.AdminMessage) - errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy - // Goroutine to handle blocking stream.Recv() and simultaneously handle exit - // signals - go func() { - for { - msg, err := stream.Recv() - if err != nil { - errCh <- err - return // Exit the receiver goroutine on error/EOF - } - msgCh <- msg - } - }() for { glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID) select { - case msg := <-msgCh: - // Message successfully received from the stream + case <-c.comms.stop: + close(c.incoming) + glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID) + c.comms.streamErrors <- nil + return + default: + msg, err := stream.Recv() + if err != nil { + if err == io.EOF { + glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID) + } else { + glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) + } + c.comms.streamErrors <- err + return // Exit the receiver goroutine on error/EOF + } glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) - - // Route message to waiting goroutines or general handler (original select logic) select { - case incoming <- msg: + case c.incoming <- msg: glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) case <-time.After(time.Second): glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message) } - case err := <-errCh: - // Stream Receiver goroutine reported an error (EOF or network error) - if err == io.EOF { - glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID) - } else { - glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) - } - - // Report the failure as a command to the managerLoop (blocking) - cmds <- grpcCommand{action: ActionStreamError, data: err} - - // Exit the main handler loop - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID) - return - - case <-streamExit: - // Manager closed this channel, signaling a controlled disconnection. - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID) - return } } } // Connect establishes gRPC connection to admin server with TLS detection func (c *GrpcAdminClient) Disconnect() error { - resp := make(chan error) - c.cmds <- grpcCommand{ - action: ActionDisconnect, - resp: resp, - } - err := <-resp - return err -} - -func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { - if !s.connected { - cmd.resp <- fmt.Errorf("already disconnected") - return - } - - // Send shutdown signal to stop reconnection loop - close(s.reconnectStop) - - s.connected = false - s.shouldReconnect = false - - // Send shutdown message - shutdownMsg := &worker_pb.WorkerMessage{ - WorkerId: c.workerID, - Timestamp: time.Now().Unix(), - Message: &worker_pb.WorkerMessage_Shutdown{ - Shutdown: &worker_pb.WorkerShutdown{ - WorkerId: c.workerID, - Reason: "normal shutdown", - }, - }, - } - - // Close outgoing/incoming - select { - case c.outgoing <- shutdownMsg: - case <-time.After(time.Second): - glog.Warningf("Failed to send shutdown message") - } - - // Send shutdown signal to stop handlers loop - close(s.streamExit) - - // Cancel stream context - if s.streamCancel != nil { - s.streamCancel() - } - - // Close connection - if s.conn != nil { - s.conn.Close() + if !c.state.started { + glog.Errorf("already disconnected") + return nil } + c.state.started = false - // Close channels - close(c.outgoing) - close(c.incoming) + // Send shutdown signal to stop connection Process + close(c.comms.stop) glog.Infof("Disconnected from admin server") - cmd.resp <- nil + return nil } // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { - respCh := make(chan error, 1) - request := registrationRequest{ - Worker: worker, - Resp: respCh, - } - c.cmds <- grpcCommand{ - action: ActionRegisterWorker, - data: request, + c.state.lastWorkerInfo = worker + if !c.state.started { + glog.V(1).Infof("Not started yet, worker info stored for registration upon connection") + // Respond immediately with success (registration will happen later) + return nil } - return <-respCh + err := c.sendRegistration(worker) + return err } // sendRegistration sends the registration message and waits for response @@ -595,56 +430,8 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream } } -func (c *GrpcAdminClient) IsConnected() bool { - respCh := make(chan bool, 1) - - c.cmds <- grpcCommand{ - action: ActionQueryConnected, - data: respCh, - } - - return <-respCh -} - -func (c *GrpcAdminClient) IsReconnecting() bool { - respCh := make(chan bool, 1) - - c.cmds <- grpcCommand{ - action: ActionQueryReconnecting, - data: respCh, - } - - return <-respCh -} - -func (c *GrpcAdminClient) ShouldReconnect() bool { - respCh := make(chan bool, 1) - - c.cmds <- grpcCommand{ - action: ActionQueryShouldReconnect, - data: respCh, - } - - return <-respCh -} - // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { - if !c.IsConnected() { - // If we're currently reconnecting, don't wait - just skip the heartbeat - reconnecting := c.IsReconnecting() - - if reconnecting { - // Don't treat as an error - reconnection is in progress - glog.V(2).Infof("Skipping heartbeat during reconnection") - return nil - } - - // Wait for reconnection for a short time - if err := c.waitForConnection(10 * time.Second); err != nil { - return fmt.Errorf("not connected to admin server: %w", err) - } - } taskIds := make([]string, len(status.CurrentTasks)) for i, task := range status.CurrentTasks { @@ -678,21 +465,6 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta // RequestTask requests a new task from admin server func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { - if !c.IsConnected() { - // If we're currently reconnecting, don't wait - just return no task - reconnecting := c.IsReconnecting() - - if reconnecting { - // Don't treat as an error - reconnection is in progress - glog.V(2).Infof("RECONNECTING: Worker %s skipping task request during reconnection", workerID) - return nil, nil - } - - // Wait for reconnection for a short time - if err := c.waitForConnection(5 * time.Second); err != nil { - return nil, fmt.Errorf("not connected to admin server: %w", err) - } - } caps := make([]string, len(capabilities)) for i, cap := range capabilities { @@ -766,22 +538,6 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str // CompleteTaskWithMetadata reports task completion with additional metadata func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { - if !c.IsConnected() { - // If we're currently reconnecting, don't wait - just skip the completion report - reconnecting := c.IsReconnecting() - - if reconnecting { - // Don't treat as an error - reconnection is in progress - glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID) - return nil - } - - // Wait for reconnection for a short time - if err := c.waitForConnection(5 * time.Second); err != nil { - return fmt.Errorf("not connected to admin server: %w", err) - } - } - taskComplete := &worker_pb.TaskComplete{ TaskId: taskID, WorkerId: c.workerID, @@ -813,22 +569,6 @@ func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, // UpdateTaskProgress updates task progress to admin server func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { - if !c.IsConnected() { - // If we're currently reconnecting, don't wait - just skip the progress update - reconnecting := c.IsReconnecting() - - if reconnecting { - // Don't treat as an error - reconnection is in progress - glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID) - return nil - } - - // Wait for reconnection for a short time - if err := c.waitForConnection(5 * time.Second); err != nil { - return fmt.Errorf("not connected to admin server: %w", err) - } - } - msg := &worker_pb.WorkerMessage{ WorkerId: c.workerID, Timestamp: time.Now().Unix(), @@ -850,37 +590,15 @@ func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) er } } -// waitForConnection waits for the connection to be established or timeout -func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error { - deadline := time.Now().Add(timeout) - - for time.Now().Before(deadline) { - connected := c.IsConnected() - shouldReconnect := c.ShouldReconnect() - - if connected { - return nil - } - - if !shouldReconnect { - return fmt.Errorf("reconnection is disabled") - } - - time.Sleep(100 * time.Millisecond) - } - - return fmt.Errorf("timeout waiting for connection") -} - // GetIncomingChannel returns the incoming message channel for message processing // This allows the worker to process admin messages directly func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage { return c.incoming } -// CreateAdminClient creates an admin client with the provided dial option -func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) { - return NewGrpcAdminClient(adminServer, workerID, dialOption), nil +// NewAdminClient creates an admin client with the provided dial option +func NewAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) AdminClient { + return NewGrpcAdminClient(adminServer, workerID, dialOption) } // getServerFromParams extracts server address from unified sources diff --git a/weed/worker/worker.go b/weed/worker/worker.go index bbd1f4662..2fa57da5b 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -25,57 +25,44 @@ type Worker struct { id string config *types.WorkerConfig registry *tasks.TaskRegistry - cmds chan workerCommand - state *workerState + client AdminClient taskLogHandler *tasks.TaskLogHandler + comms workerChannels + state workerState +} +type workerChannels struct { + stop chan struct{} + connectionEvents chan connectionEvent + taskReqs chan taskRequest + taskCompl chan taskCompletion + metricsQuery chan chan metricsResponse + loadQuery chan chan int +} + +type metricsResponse struct { + success, failure int } type workerState struct { - running bool - adminClient AdminClient - startTime time.Time - stopChan chan struct{} - heartbeatTicker *time.Ticker - requestTicker *time.Ticker - currentTasks map[string]*types.TaskInput - tasksCompleted int - tasksFailed int + running bool + startTime time.Time +} +type taskRequest struct { + task *types.TaskInput + resp chan taskResponse } -type workerAction string - -const ( - ActionStart workerAction = "start" - ActionStop workerAction = "stop" - ActionGetStatus workerAction = "getstatus" - ActionGetTaskLoad workerAction = "getload" - ActionSetTask workerAction = "settask" - ActionSetAdmin workerAction = "setadmin" - ActionRemoveTask workerAction = "removetask" - ActionGetAdmin workerAction = "getadmin" - ActionIncTaskFail workerAction = "inctaskfail" - ActionIncTaskComplete workerAction = "inctaskcomplete" - ActionGetHbTick workerAction = "gethbtick" - ActionGetReqTick workerAction = "getreqtick" - ActionGetStopChan workerAction = "getstopchan" - ActionSetHbTick workerAction = "sethbtick" - ActionSetReqTick workerAction = "setreqtick" - ActionGetStartTime workerAction = "getstarttime" - ActionGetCompletedTasks workerAction = "getcompletedtasks" - ActionGetFailedTasks workerAction = "getfailedtasks" - ActionCancelTask workerAction = "canceltask" - // ... other worker actions like Stop, Status, etc. -) +type taskResponse struct { + accepted bool + reason error +} -type statusResponse chan types.WorkerStatus -type workerCommand struct { - action workerAction - data any - resp chan error // for reporting success/failure +type taskCompletion struct { + success bool } // AdminClient defines the interface for communicating with the admin server type AdminClient interface { - Connect() error + Connect(workerInfo *types.WorkerData) error Disconnect() error RegisterWorker(worker *types.WorkerData) error SendHeartbeat(workerID string, status *types.WorkerStatus) error @@ -83,7 +70,7 @@ type AdminClient interface { CompleteTask(taskID string, success bool, errorMsg string) error CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error UpdateTaskProgress(taskID string, progress float64) error - IsConnected() bool + GetEvents() chan connectionEvent } // GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory @@ -157,290 +144,108 @@ func GenerateOrLoadWorkerID(workingDir string) (string, error) { return workerID, nil } -// NewWorker creates a new worker instance -func NewWorker(config *types.WorkerConfig) (*Worker, error) { +func setWorkingDir(workingDir string) (string, error) { + // Set working directory and create task-specific subdirectories + var baseWorkingDir string + if workingDir != "" { + glog.Infof("Setting working directory to: %s", workingDir) + if err := os.Chdir(workingDir); err != nil { + return "", fmt.Errorf("failed to change working directory: %v", err) + } + wd, err := os.Getwd() + if err != nil { + return "", fmt.Errorf("failed to get working directory: %v", err) + } + baseWorkingDir = wd + glog.Infof("Current working directory: %s", baseWorkingDir) + } else { + // Use default working directory when not specified + wd, err := os.Getwd() + if err != nil { + return "", fmt.Errorf("failed to get current working directory: %v", err) + } + baseWorkingDir = wd + glog.Infof("Using current working directory: %s", baseWorkingDir) + } + return baseWorkingDir, nil +} + +func makeDirectories(capabilities []types.TaskType, baseWorkingDir string) error { + // Create task-specific subdirectories + for _, capability := range capabilities { + taskDir := filepath.Join(baseWorkingDir, string(capability)) + if err := os.MkdirAll(taskDir, 0755); err != nil { + return fmt.Errorf("failed to create task directory %s: %v", taskDir, err) + } + glog.Infof("Created task directory: %s", taskDir) + } + return nil +} + +func NewWorkerWithDefaults(config *types.WorkerConfig) (*Worker, error) { if config == nil { config = types.DefaultWorkerConfig() } + baseWorkingDir, err := setWorkingDir(config.BaseWorkingDir) + if err != nil { + return nil, err + } + config.BaseWorkingDir = baseWorkingDir + if err := makeDirectories(config.Capabilities, config.BaseWorkingDir); err != nil { + return nil, err + } // Generate or load persistent worker ID - workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir) + workerID, err := GenerateOrLoadWorkerID(baseWorkingDir) if err != nil { return nil, fmt.Errorf("failed to generate or load worker ID: %w", err) } - + client := NewAdminClient(config.AdminServer, workerID, config.GrpcDialOption) // Use the global unified registry that already has all tasks registered registry := tasks.GetGlobalTaskRegistry() // Initialize task log handler - logDir := filepath.Join(config.BaseWorkingDir, "task_logs") + logDir := filepath.Join(baseWorkingDir, "task_logs") // Ensure the base task log directory exists to avoid errors when admin requests logs if err := os.MkdirAll(logDir, 0755); err != nil { - glog.Warningf("Failed to create task log base directory %s: %v", logDir, err) + return nil, fmt.Errorf("failed to create task log base directory %s: %v", logDir, err) } taskLogHandler := tasks.NewTaskLogHandler(logDir) + return NewWorker(workerID, config, registry, client, taskLogHandler), nil +} + +// NewWorker creates a new worker instance +func NewWorker(workerID string, config *types.WorkerConfig, registry *tasks.TaskRegistry, client AdminClient, taskLogHandler *tasks.TaskLogHandler) *Worker { worker := &Worker{ id: workerID, config: config, registry: registry, + client: client, taskLogHandler: taskLogHandler, - cmds: make(chan workerCommand), } glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll())) - go worker.managerLoop() - return worker, nil -} - -func (w *Worker) managerLoop() { - w.state = &workerState{ - startTime: time.Now(), - stopChan: make(chan struct{}), - currentTasks: make(map[string]*types.TaskInput), - } -out: - for cmd := range w.cmds { - switch cmd.action { - case ActionStart: - w.handleStart(cmd) - case ActionStop: - w.handleStop(cmd) - break out - case ActionGetStatus: - respCh := cmd.data.(statusResponse) - var currentTasks []types.TaskInput - for _, task := range w.state.currentTasks { - currentTasks = append(currentTasks, *task) - } - - statusStr := "active" - if len(w.state.currentTasks) >= w.config.MaxConcurrent { - statusStr = "busy" - } - - status := types.WorkerStatus{ - WorkerID: w.id, - Status: statusStr, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.state.currentTasks), - LastHeartbeat: time.Now(), - CurrentTasks: currentTasks, - Uptime: time.Since(w.state.startTime), - TasksCompleted: w.state.tasksCompleted, - TasksFailed: w.state.tasksFailed, - } - respCh <- status - case ActionGetTaskLoad: - respCh := cmd.data.(chan int) - respCh <- len(w.state.currentTasks) - case ActionSetTask: - currentLoad := len(w.state.currentTasks) - if currentLoad >= w.config.MaxConcurrent { - cmd.resp <- fmt.Errorf("worker is at capacity") - } - task := cmd.data.(*types.TaskInput) - w.state.currentTasks[task.ID] = task - cmd.resp <- nil - case ActionSetAdmin: - admin := cmd.data.(AdminClient) - w.state.adminClient = admin - case ActionRemoveTask: - taskID := cmd.data.(string) - delete(w.state.currentTasks, taskID) - case ActionGetAdmin: - respCh := cmd.data.(chan AdminClient) - respCh <- w.state.adminClient - case ActionIncTaskFail: - w.state.tasksFailed++ - case ActionIncTaskComplete: - w.state.tasksCompleted++ - case ActionGetHbTick: - respCh := cmd.data.(chan *time.Ticker) - respCh <- w.state.heartbeatTicker - case ActionGetReqTick: - respCh := cmd.data.(chan *time.Ticker) - respCh <- w.state.requestTicker - case ActionSetHbTick: - w.state.heartbeatTicker = cmd.data.(*time.Ticker) - case ActionSetReqTick: - w.state.requestTicker = cmd.data.(*time.Ticker) - case ActionGetStopChan: - cmd.data.(chan chan struct{}) <- w.state.stopChan - case ActionGetStartTime: - cmd.data.(chan time.Time) <- w.state.startTime - case ActionGetCompletedTasks: - cmd.data.(chan int) <- w.state.tasksCompleted - case ActionGetFailedTasks: - cmd.data.(chan int) <- w.state.tasksFailed - case ActionCancelTask: - taskID := cmd.data.(string) - if task, exists := w.state.currentTasks[taskID]; exists { - glog.Infof("Cancelling task %s", task.ID) - // TODO: Implement actual task cancellation logic - } else { - glog.Warningf("Cannot cancel task %s: task not found", taskID) - } - - } - } -} - -func (w *Worker) getTaskLoad() int { - respCh := make(chan int, 1) - w.cmds <- workerCommand{ - action: ActionGetTaskLoad, - data: respCh, - resp: nil, - } - return <-respCh -} - -func (w *Worker) setTask(task *types.TaskInput) error { - resp := make(chan error) - w.cmds <- workerCommand{ - action: ActionSetTask, - data: task, - resp: resp, - } - if err := <-resp; err != nil { - glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", - w.id, w.getTaskLoad(), w.config.MaxConcurrent, task.ID) - return err - } - newLoad := w.getTaskLoad() - - glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", - w.id, task.ID, newLoad, w.config.MaxConcurrent) - return nil -} - -func (w *Worker) removeTask(task *types.TaskInput) int { - w.cmds <- workerCommand{ - action: ActionRemoveTask, - data: task.ID, - } - return w.getTaskLoad() -} - -func (w *Worker) getAdmin() AdminClient { - respCh := make(chan AdminClient, 1) - w.cmds <- workerCommand{ - action: ActionGetAdmin, - data: respCh, - } - return <-respCh -} - -func (w *Worker) getStopChan() chan struct{} { - respCh := make(chan chan struct{}, 1) - w.cmds <- workerCommand{ - action: ActionGetStopChan, - data: respCh, - } - return <-respCh -} - -func (w *Worker) getHbTick() *time.Ticker { - respCh := make(chan *time.Ticker, 1) - w.cmds <- workerCommand{ - action: ActionGetHbTick, - data: respCh, - } - return <-respCh -} - -func (w *Worker) getReqTick() *time.Ticker { - respCh := make(chan *time.Ticker, 1) - w.cmds <- workerCommand{ - action: ActionGetReqTick, - data: respCh, - } - return <-respCh -} - -func (w *Worker) setHbTick(tick *time.Ticker) *time.Ticker { - w.cmds <- workerCommand{ - action: ActionSetHbTick, - data: tick, - } - return w.getHbTick() -} - -func (w *Worker) setReqTick(tick *time.Ticker) *time.Ticker { - w.cmds <- workerCommand{ - action: ActionSetReqTick, - data: tick, - } - return w.getReqTick() -} - -func (w *Worker) getStartTime() time.Time { - respCh := make(chan time.Time, 1) - w.cmds <- workerCommand{ - action: ActionGetStartTime, - data: respCh, - } - return <-respCh -} -func (w *Worker) getCompletedTasks() int { - respCh := make(chan int, 1) - w.cmds <- workerCommand{ - action: ActionGetCompletedTasks, - data: respCh, - } - return <-respCh -} -func (w *Worker) getFailedTasks() int { - respCh := make(chan int, 1) - w.cmds <- workerCommand{ - action: ActionGetFailedTasks, - data: respCh, - } - return <-respCh -} - -// getTaskLoggerConfig returns the task logger configuration with worker's log directory -func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig { - config := tasks.DefaultTaskLoggerConfig() - - // Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty) - logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs") - config.BaseLogDir = logDir - - return config -} - -// ID returns the worker ID -func (w *Worker) ID() string { - return w.id + return worker } func (w *Worker) Start() error { - resp := make(chan error) - w.cmds <- workerCommand{ - action: ActionStart, - resp: resp, - } - return <-resp -} - -// Start starts the worker -func (w *Worker) handleStart(cmd workerCommand) { if w.state.running { - cmd.resp <- fmt.Errorf("worker is already running") - return + return fmt.Errorf("worker is already running") } - if w.state.adminClient == nil { - cmd.resp <- fmt.Errorf("admin client is not set") - return + if w.getAdmin() == nil { + return fmt.Errorf("admin client is not set") } w.state.running = true w.state.startTime = time.Now() + w.comms.stop = make(chan struct{}) + w.comms.taskReqs = make(chan taskRequest) + w.comms.taskCompl = make(chan taskCompletion) + w.comms.loadQuery = make(chan chan int) + // Prepare worker info for registration workerInfo := &types.WorkerData{ ID: w.id, @@ -451,58 +256,46 @@ func (w *Worker) handleStart(cmd workerCommand) { LastHeartbeat: time.Now(), } - // Register worker info with client first (this stores it for use during connection) - if err := w.state.adminClient.RegisterWorker(workerInfo); err != nil { - glog.V(1).Infof("Worker info stored for registration: %v", err) - // This is expected if not connected yet - } - // Start connection attempt (will register immediately if successful) glog.Infof("WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d", w.id, w.config.Capabilities, w.config.MaxConcurrent) // Try initial connection, but don't fail if it doesn't work immediately - if err := w.state.adminClient.Connect(); err != nil { + if err := w.getAdmin().Connect(workerInfo); err != nil { glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) // Don't return error - let the reconnection loop handle it } else { glog.Infof("INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id) } + w.comms.connectionEvents = w.getAdmin().GetEvents() // Start worker loops regardless of initial connection status // They will handle connection failures gracefully glog.V(1).Infof("STARTING LOOPS: Worker %s starting background loops", w.id) go w.heartbeatLoop() go w.taskRequestLoop() - go w.connectionMonitorLoop() + go w.connectionMonitorProcess() go w.messageProcessingLoop() + go w.taskProcess() glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) - cmd.resp <- nil + return nil +} + +func (w *Worker) getTaskLoad() int { + loadCh := make(chan int) + w.comms.loadQuery <- loadCh + return <-loadCh } func (w *Worker) Stop() error { - resp := make(chan error) - w.cmds <- workerCommand{ - action: ActionStop, - resp: resp, - } - if err := <-resp; err != nil { - return err + if !w.state.running { + return nil } - // Wait for tasks to finish - timeout := time.NewTimer(30 * time.Second) - defer timeout.Stop() -out: - for w.getTaskLoad() > 0 { - select { - case <-timeout.C: - glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad()) - break out - case <-time.After(100 * time.Millisecond): - } - } + w.state.running = false + + close(w.comms.stop) // Disconnect from admin server if adminClient := w.getAdmin(); adminClient != nil { @@ -514,25 +307,85 @@ out: return nil } -// Stop stops the worker -func (w *Worker) handleStop(cmd workerCommand) { - if !w.state.running { - cmd.resp <- nil - return - } +// Task Process owns ALL task state +func (w *Worker) taskProcess() { + var currentLoad int + var success int + var failure int + var maxConcurrent = w.config.MaxConcurrent + doneCh := make(chan chan int) - w.state.running = false - close(w.state.stopChan) + for { + select { + case <-w.comms.stop: + if currentLoad > 0 { + glog.Warningf("Worker %s stopping with %d tasks still running", w.id, currentLoad) + } + return - // Stop tickers - if w.state.heartbeatTicker != nil { - w.state.heartbeatTicker.Stop() - } - if w.state.requestTicker != nil { - w.state.requestTicker.Stop() + case req := <-w.comms.taskReqs: + if currentLoad >= maxConcurrent { + req.resp <- taskResponse{ + accepted: false, + reason: fmt.Errorf("worker is at capacity"), + } + glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", + w.id, currentLoad, maxConcurrent, req.task.ID) + continue + } + + // Accept task and update our owned state + currentLoad++ + req.resp <- taskResponse{accepted: true} + + glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", + w.id, req.task.ID, currentLoad, maxConcurrent) + + // Execute task and manage our own load count + go w.executeTask(req.task, doneCh) + case loadCh := <-doneCh: + currentLoad-- + loadCh <- currentLoad + case compl := <-w.comms.taskCompl: + if compl.success { + success++ + } else { + failure++ + } + case resp := <-w.comms.metricsQuery: + resp <- metricsResponse{success: success, failure: failure} + case resp := <-w.comms.loadQuery: + resp <- currentLoad + } } +} + +func (w *Worker) getAdmin() AdminClient { + return w.client +} + +func (w *Worker) getStopChan() <-chan struct{} { + return w.comms.stop +} - cmd.resp <- nil +func (w *Worker) getStartTime() time.Time { + return w.state.startTime +} + +// getTaskLoggerConfig returns the task logger configuration with worker's log directory +func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig { + config := tasks.DefaultTaskLoggerConfig() + + // Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty) + logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs") + config.BaseLogDir = logDir + + return config +} + +// ID returns the worker ID +func (w *Worker) ID() string { + return w.id } // RegisterTask registers a task factory @@ -545,28 +398,19 @@ func (w *Worker) GetCapabilities() []types.TaskType { return w.config.Capabilities } -// GetStatus returns the current worker status -func (w *Worker) GetStatus() types.WorkerStatus { - respCh := make(statusResponse, 1) - w.cmds <- workerCommand{ - action: ActionGetStatus, - data: respCh, - resp: nil, - } - return <-respCh -} - // HandleTask handles a task execution func (w *Worker) HandleTask(task *types.TaskInput) error { glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)", w.id, task.ID, task.Type, task.VolumeID) - - if err := w.setTask(task); err != nil { - return err + resp := make(chan taskResponse) + if w.comms.taskReqs == nil { + return fmt.Errorf("worker is shutting down") + } + w.comms.taskReqs <- taskRequest{task: task, resp: resp} + result := <-resp + if !result.accepted { + return result.reason } - - // Execute task in goroutine - go w.executeTask(task) return nil } @@ -593,18 +437,17 @@ func (w *Worker) SetTaskRequestInterval(interval time.Duration) { // SetAdminClient sets the admin client func (w *Worker) SetAdminClient(client AdminClient) { - w.cmds <- workerCommand{ - action: ActionSetAdmin, - data: client, - } + w.client = client } // executeTask executes a task -func (w *Worker) executeTask(task *types.TaskInput) { +func (w *Worker) executeTask(task *types.TaskInput, done chan<- chan int) { startTime := time.Now() defer func() { - currentLoad := w.removeTask(task) + currentLoadCh := make(chan int) + done <- currentLoadCh + currentLoad := <-currentLoadCh duration := time.Since(startTime) glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", @@ -708,9 +551,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Report completion if err != nil { w.completeTask(task.ID, false, err.Error()) - w.cmds <- workerCommand{ - action: ActionIncTaskFail, - } + w.comms.taskCompl <- taskCompletion{success: false} glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) if fileLogger != nil { fileLogger.LogStatus("failed", err.Error()) @@ -718,9 +559,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { } } else { w.completeTask(task.ID, true, "") - w.cmds <- workerCommand{ - action: ActionIncTaskComplete, - } + w.comms.taskCompl <- taskCompletion{success: true} glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) if fileLogger != nil { fileLogger.Info("Task %s completed successfully", task.ID) @@ -739,8 +578,8 @@ func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { // heartbeatLoop sends periodic heartbeats to the admin server func (w *Worker) heartbeatLoop() { - defer w.setHbTick(time.NewTicker(w.config.HeartbeatInterval)).Stop() - ticker := w.getHbTick() + ticker := time.NewTicker(w.config.HeartbeatInterval) + defer ticker.Stop() stopChan := w.getStopChan() for { select { @@ -754,8 +593,8 @@ func (w *Worker) heartbeatLoop() { // taskRequestLoop periodically requests new tasks from the admin server func (w *Worker) taskRequestLoop() { - defer w.setReqTick(time.NewTicker(w.config.TaskRequestInterval)).Stop() - ticker := w.getReqTick() + ticker := time.NewTicker(w.config.TaskRequestInterval) + defer ticker.Stop() stopChan := w.getStopChan() for { select { @@ -820,49 +659,25 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry { return w.registry } -// registerWorker registers the worker with the admin server -func (w *Worker) registerWorker() { - workerInfo := &types.WorkerData{ - ID: w.id, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - Status: "active", - CurrentLoad: 0, - LastHeartbeat: time.Now(), - } - - if err := w.getAdmin().RegisterWorker(workerInfo); err != nil { - glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) - } else { - glog.Infof("Worker %s registered successfully with admin server", w.id) - } -} - -// connectionMonitorLoop monitors connection status -func (w *Worker) connectionMonitorLoop() { - ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds - defer ticker.Stop() - - lastConnectionStatus := false +// connectionMonitorProcess monitors connection status +func (w *Worker) connectionMonitorProcess() { + var connected bool stopChan := w.getStopChan() for { select { case <-stopChan: glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) return - case <-ticker.C: - // Monitor connection status and log changes - currentConnectionStatus := w.getAdmin() != nil && w.getAdmin().IsConnected() - - if currentConnectionStatus != lastConnectionStatus { - if currentConnectionStatus { + case event := <-w.comms.connectionEvents: + if event.connected != connected { + if event.connected { glog.Infof("CONNECTION RESTORED: Worker %s connection status changed: connected", w.id) } else { glog.Warningf("CONNECTION LOST: Worker %s connection status changed: disconnected", w.id) } - lastConnectionStatus = currentConnectionStatus + connected = event.connected } else { - if currentConnectionStatus { + if event.connected { glog.V(3).Infof("CONNECTION OK: Worker %s connection status: connected", w.id) } else { glog.V(1).Infof("CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id) @@ -880,16 +695,22 @@ func (w *Worker) GetConfig() *types.WorkerConfig { // GetPerformanceMetrics returns performance metrics func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { + metricsCh := make(chan metricsResponse) + w.comms.metricsQuery <- metricsCh + metrics := <-metricsCh + success := metrics.success + failure := metrics.failure + uptime := time.Since(w.getStartTime()) var successRate float64 - totalTasks := w.getCompletedTasks() + w.getFailedTasks() + totalTasks := success + failure if totalTasks > 0 { - successRate = float64(w.getCompletedTasks()) / float64(totalTasks) * 100 + successRate = float64(failure) / float64(totalTasks) * 100 } return &types.WorkerPerformance{ - TasksCompleted: w.getCompletedTasks(), - TasksFailed: w.getFailedTasks(), + TasksCompleted: success, + TasksFailed: failure, AverageTaskTime: 0, // Would need to track this Uptime: uptime, SuccessRate: successRate, @@ -1006,11 +827,8 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) { glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId) - w.cmds <- workerCommand{ - action: ActionCancelTask, - data: cancellation.TaskId, - resp: nil, - } + // TODO: To implement task cancellation, each task type should define how + // a task can be cancelled. } // handleAdminShutdown processes admin shutdown notifications From 586c1313508db78c26e63d8eeddfbc92fe5c8b2e Mon Sep 17 00:00:00 2001 From: Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> Date: Mon, 27 Oct 2025 20:59:46 +0200 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=90=9B=20fix(worker):=20calculate=20s?= =?UTF-8?q?uccess=20rate=20correctly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- weed/worker/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 2fa57da5b..8610d4b55 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -705,7 +705,7 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { var successRate float64 totalTasks := success + failure if totalTasks > 0 { - successRate = float64(failure) / float64(totalTasks) * 100 + successRate = float64(success) / float64(totalTasks) * 100 } return &types.WorkerPerformance{ From f3428f3868a106f98c3d40252066fa69c9c83fdf Mon Sep 17 00:00:00 2001 From: Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> Date: Mon, 27 Oct 2025 21:08:58 +0200 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=90=9B=20fix(client):=20reconnect=20i?= =?UTF-8?q?f=20initial=20attempt=20failed=20for=20any=20reason?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- weed/worker/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/worker/client.go b/weed/worker/client.go index 0e1e11270..73e7347a8 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -111,6 +111,8 @@ func (c *GrpcAdminClient) connectionProcess() { if err != nil { glog.Warningf("Initial connection failed: %v", err) c.comms.connectionEvents <- connectionEvent{connected: false, err: err} + c.comms.streamErrors <- err + c.comms.streamErrors <- err } else { c.comms.connectionEvents <- connectionEvent{connected: true} } From d981da92512fc985a2d429fa7e910df94d59eea4 Mon Sep 17 00:00:00 2001 From: Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> Date: Mon, 27 Oct 2025 21:17:42 +0200 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=90=9B=20fix(worker):=20initialize=20?= =?UTF-8?q?metrics=20query=20channel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- weed/worker/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 8610d4b55..18a9d8509 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -245,6 +245,7 @@ func (w *Worker) Start() error { w.comms.taskReqs = make(chan taskRequest) w.comms.taskCompl = make(chan taskCompletion) w.comms.loadQuery = make(chan chan int) + w.comms.metricsQuery = make(chan chan metricsResponse) // Prepare worker info for registration workerInfo := &types.WorkerData{