From ff14dfa0eee2f3b3689452ffc78368cb288d53b0 Mon Sep 17 00:00:00 2001
From: chrislu <chris.lu@gmail.com>
Date: Thu, 30 May 2024 09:15:50 -0700
Subject: [PATCH] debug

---
 weed/mq/broker/broker_grpc_sub.go        | 6 +++++-
 weed/mq/broker/broker_grpc_sub_follow.go | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 68a80398d..d1131892b 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -66,7 +66,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
 		} else {
 			defer func() {
 				println("closing SubscribeFollowMe connection", follower)
-				followerGrpcConnection.Close()
+				subscribeFollowMeStream.CloseSend()
+				// followerGrpcConnection.Close()
 			}()
 			followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
 			if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
@@ -94,6 +95,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
 			ack, err := stream.Recv()
 			if err != nil {
 				if err == io.EOF {
+					// the client has called CloseSend(). This is to ack the close.
 					stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
 						Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
 							IsEndOfStream: true,
@@ -127,6 +129,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
 			}
 		}
 		if lastOffset > 0 {
+			glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
 			if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
 				glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
 			}
@@ -204,6 +207,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
 		return
 	}
 	if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
+		glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
 		startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
 		return
 	}
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index 351b904c2..cfea6f7c7 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -42,7 +42,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
 		// Process the received message
 		if ackMessage := req.GetAck(); ackMessage != nil {
 			lastOffset = ackMessage.TsNs
-			// println("offset", lastOffset)
+			println("sub follower got offset", lastOffset)
 		} else if closeMessage := req.GetClose(); closeMessage != nil {
 			glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
 			return nil