diff --git a/weed/worker/client.go b/weed/worker/client.go index 27633ecdb..3eb892e86 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -356,35 +356,41 @@ func handleOutgoing( close(errCh) }() + // Helper function to handle stream errors and cleanup + handleStreamError := func(err error) { + if err != nil { + glog.Errorf("Failed to send message to admin: %v", err) + cmds <- grpcCommand{action: ActionStreamError, data: err} + } + } + + // Helper function to cleanup resources + cleanup := func() { + close(msgCh) + <-errCh + } + for { select { case <-streamExit: - close(msgCh) - <-errCh + cleanup() return case err := <-errCh: - if err != nil { - glog.Errorf("Failed to send message to admin: %v", err) - cmds <- grpcCommand{action: ActionStreamError, data: err} - } + handleStreamError(err) return case msg, ok := <-outgoing: if !ok { - close(msgCh) - <-errCh + cleanup() return } select { case msgCh <- msg: + // Message queued successfully case <-streamExit: - close(msgCh) - <-errCh + cleanup() return case err := <-errCh: - if err != nil { - glog.Errorf("Failed to send message to admin: %v", err) - cmds <- grpcCommand{action: ActionStreamError, data: err} - } + handleStreamError(err) return } }