From 0129a7bf9b58cc2fd68e7cc4f297da44fcab6fb8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 25 Feb 2025 01:15:30 -0800 Subject: [PATCH] use sessionId --- weed/mq/agent/agent_grpc_subscribe.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index 5706a3774..0e9adc720 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -14,15 +14,16 @@ import ( func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { // the first message is the subscribe request // it should only contain the session id - m, err := stream.Recv() + initMessage, err := stream.Recv() if err != nil { return err } + sessionId := SessionId(initMessage.SessionId) a.subscribersLock.RLock() - subscriberEntry, found := a.subscribers[SessionId(m.SessionId)] + subscriberEntry, found := a.subscribers[sessionId] a.subscribersLock.RUnlock() if !found { - return fmt.Errorf("subscribe session id %d not found", m.SessionId) + return fmt.Errorf("subscribe session id %d not found", sessionId) } defer func() { subscriberEntry.lastActiveTsNs = time.Now().UnixNano() @@ -34,6 +35,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA record := &schema_pb.RecordValue{} err := proto.Unmarshal(m.Data.Value, record) if err != nil { + glog.V(0).Infof("unmarshal record value: %v", err) if lastErr == nil { lastErr = err } @@ -44,6 +46,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA Value: record, TsNs: m.Data.TsNs, }); sendErr != nil { + glog.V(0).Infof("send record: %v", sendErr) if lastErr == nil { lastErr = sendErr } @@ -53,7 +56,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA go func() { subErr := subscriberEntry.entry.Subscribe() if subErr != nil { - glog.V(0).Infof("subscriber %d subscribe: %v", m.SessionId, subErr) + glog.V(0).Infof("subscriber %d subscribe: %v", sessionId, subErr) if lastErr == nil { lastErr = subErr } @@ -63,6 +66,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA for { m, err := stream.Recv() if err != nil { + glog.V(0).Infof("subscriber %d receive: %v", sessionId, err) return err } if m != nil {