From d1f0c404023a6ad5b4b9b80b85ef41948fc0c48e Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 30 May 2024 09:49:08 -0700 Subject: [PATCH] remove per-message debug logs --- weed/mq/broker/broker_grpc_sub.go | 4 ++-- weed/mq/broker/broker_grpc_sub_follow.go | 2 +- weed/mq/sub_coordinator/inflight_message_tracker.go | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index d1131892b..286812a9b 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -112,7 +112,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) currentLastOffset := imt.GetOldestAckedTimestamp() - fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset) + // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset) if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ Message: &mq_pb.SubscribeFollowMeRequest_Ack{ @@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs break } lastOffset = currentLastOffset - fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset) + // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset) } } if lastOffset > 0 { diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index cfea6f7c7..f7f4ac7e9 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -42,7 +42,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub // Process the received message if ackMessage := req.GetAck(); ackMessage != nil { lastOffset = ackMessage.TsNs - println("sub follower got offset", lastOffset) + // println("sub follower got offset", lastOffset) } else if closeMessage := req.GetClose(); closeMessage != nil { glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) return nil diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index f8effef95..f1c46e06b 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -1,7 +1,6 @@ package sub_coordinator import ( - "fmt" "sort" "sync" ) @@ -22,7 +21,7 @@ func NewInflightMessageTracker(capacity int) *InflightMessageTracker { // EnflightMessage tracks the message with the key and timestamp. // These messages are sent to the consumer group instances and waiting for ack. func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) { - fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs) + // fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs) imt.mu.Lock() defer imt.mu.Unlock() imt.messages[string(key)] = tsNs @@ -53,7 +52,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64) // AcknowledgeMessage acknowledges the message with the key and timestamp. func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool { - fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs) + // fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs) imt.mu.Lock() defer imt.mu.Unlock() timestamp, exists := imt.messages[string(key)]