From 618235bd725e0faf570f10f27669c384c448e942 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 25 Jun 2025 17:31:28 -0700 Subject: [PATCH] fix error logging to reduce noise --- weed/mq/broker/broker_grpc_pub.go | 58 +++++++++++++++-- weed/mq/broker/broker_grpc_pub_follow.go | 18 ++++-- weed/mq/client/pub_client/scheduler.go | 82 ++++++++++++++++++++---- weed/mq/topic/local_partition.go | 52 +++++++++++++-- 4 files changed, 180 insertions(+), 30 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f31dc7eff..df5ac1ba8 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,15 +3,19 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/peer" "io" "math/rand" "net" + "strings" "sync/atomic" "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" ) // PUB @@ -126,7 +130,12 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + // Use appropriate logging level based on error type + if isConnectionError(err) { + glog.V(1).Infof("topic %v partition %v publish stream from %s connection error (client disconnected): %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + } else { + glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + } break } @@ -145,7 +154,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) + glog.V(1).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } @@ -164,3 +173,40 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } + +// isConnectionError checks if an error is a connection-level error +func isConnectionError(err error) bool { + if err == nil { + return false + } + + // Check gRPC status codes for connection errors + if grpcStatus, ok := status.FromError(err); ok { + switch grpcStatus.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error patterns + errStr := strings.ToLower(err.Error()) + connectionErrorPatterns := []string{ + "eof", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "no such host", + "network is unreachable", + "context deadline exceeded", + "context canceled", + } + + for _, pattern := range connectionErrorPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 291f1ef62..36814e3fb 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -2,13 +2,14 @@ package broker import ( "fmt" + "io" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "io" - "time" ) type memBuffer struct { @@ -43,7 +44,12 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi err = nil break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + // Use appropriate logging level based on error type + if isConnectionError(err) { + glog.V(1).Infof("topic %v partition %v publish follower stream connection error (leader disconnected): %v", initMessage.Topic, initMessage.Partition, err) + } else { + glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + } break } @@ -62,10 +68,10 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi } // println("ack", string(dataMessage.Key), dataMessage.TsNs) } else if closeMessage := req.GetClose(); closeMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + glog.V(1).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) break } else if flushMessage := req.GetFlush(); flushMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) + glog.V(1).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) lastFlushTsNs = flushMessage.TsNs @@ -124,7 +130,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) } - glog.V(0).Infof("shut down follower for %v %v", t, p) + glog.V(1).Infof("shut down follower for %v %v", t, p) return err } diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index a768fa7f8..7269390f0 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -3,6 +3,13 @@ package pub_client import ( "context" "fmt" + "log" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -11,11 +18,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "log" - "sort" - "sync" - "sync/atomic" - "time" ) type EachPartitionError struct { @@ -39,7 +41,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } - log.Printf("start scheduler thread for topic %s", p.config.Topic) + glog.V(0).Infof("start scheduler thread for topic %s", p.config.Topic) generation := 0 var errChan chan EachPartitionError @@ -66,7 +68,12 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { for { select { case eachErr := <-errChan: - glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + // Use appropriate logging level based on error type + if isConnectionError(eachErr.Err) { + glog.V(1).Infof("gen %d connection error for topic %s partition %v (will retry): %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + } else { + glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + } if eachErr.generation < generation { continue } @@ -114,7 +121,12 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { - log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) + // Use appropriate logging level based on error type + if isConnectionError(err) { + glog.V(1).Infof("connection error publishing to %s partition %v (will retry): %v", p.config.Topic, job.Partition, err) + } else { + log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) + } errChan <- EachPartitionError{assignment, err, generation} } }(job) @@ -128,7 +140,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { - log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) + glog.V(1).Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) if err != nil { @@ -176,20 +188,25 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { - log.Printf("publish to %s EOF", publishClient.Broker) + glog.V(1).Infof("publish stream to %s closed (EOF)", publishClient.Broker) return } publishClient.Err = err - log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + // Use appropriate logging level based on error type + if isConnectionError(err) { + glog.V(1).Infof("connection error receiving from %s (will retry): %v", publishClient.Broker, err) + } else { + log.Printf("publish receive error from %s: %v", publishClient.Broker, err) + } return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Printf("publish ack error from %s: %v", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + glog.V(2).Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return @@ -222,7 +239,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro } } - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + glog.V(1).Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil } @@ -296,3 +313,40 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr) } + +// isConnectionError checks if an error is a connection-level error that is handled automatically +func isConnectionError(err error) bool { + if err == nil { + return false + } + + // Check gRPC status codes for connection errors + if grpcStatus, ok := status.FromError(err); ok { + switch grpcStatus.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error patterns + errStr := strings.ToLower(err.Error()) + connectionErrorPatterns := []string{ + "eof", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "no such host", + "network is unreachable", + "context deadline exceeded", + "context canceled", + } + + for _, pattern := range connectionErrorPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index d1433775a..e3a67ed3f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -3,6 +3,11 @@ package topic import ( "context" "fmt" + "strings" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -10,9 +15,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "sync" - "sync/atomic" - "time" ) type LocalPartition struct { @@ -182,7 +184,12 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower) return } - glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) + // Use appropriate logging level based on error type + if isConnectionError(err) { + glog.V(1).Infof("local partition %v follower %s connection error (will retry): %v", p.Partition, p.Follower, err) + } else { + glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) + } return } atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) @@ -242,3 +249,40 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { // println("notifying", p.Follower, "flushed at", flushTsNs) } } + +// isConnectionError checks if an error is a connection-level error +func isConnectionError(err error) bool { + if err == nil { + return false + } + + // Check gRPC status codes for connection errors + if grpcStatus, ok := status.FromError(err); ok { + switch grpcStatus.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error patterns + errStr := strings.ToLower(err.Error()) + connectionErrorPatterns := []string{ + "eof", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "no such host", + "network is unreachable", + "context deadline exceeded", + "context canceled", + } + + for _, pattern := range connectionErrorPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +}