diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 069c19929..ae7d8fa37 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" @@ -134,7 +135,21 @@ func (s *Server) Close() error { if s.ln != nil { _ = s.ln.Close() } - s.wg.Wait() + + // Wait for goroutines to finish with a timeout to prevent hanging + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + // Normal shutdown + case <-time.After(2 * time.Second): + // Timeout - force shutdown + glog.Warningf("Server shutdown timed out after 2 seconds, forcing close") + } // Close the handler (important for SeaweedMQ mode) if s.handler != nil {