From b8eb6f79b13b3aeb0dc4a7eef883193c69752684 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 26 Feb 2025 00:30:32 -0800 Subject: [PATCH] do not reuse session, since always session id is new after restart remove last active ts from SessionEntry --- weed/mq/agent/agent_grpc_pub_session.go | 11 ------- weed/mq/agent/agent_grpc_publish.go | 11 ++++--- weed/mq/agent/agent_grpc_sub_session.go | 11 ------- weed/mq/agent/agent_grpc_subscribe.go | 44 ++++++++++++++----------- weed/mq/agent/agent_server.go | 3 +- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/weed/mq/agent/agent_grpc_pub_session.go b/weed/mq/agent/agent_grpc_pub_session.go index 9c7497609..7cf857da1 100644 --- a/weed/mq/agent/agent_grpc_pub_session.go +++ b/weed/mq/agent/agent_grpc_pub_session.go @@ -7,7 +7,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "log/slog" "math/rand/v2" - "time" ) func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) { @@ -26,16 +25,6 @@ func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_age } a.publishersLock.Lock() - // remove inactive publishers to avoid memory leak - for k, entry := range a.publishers { - if entry.lastActiveTsNs == 0 { - // this is an active session - continue - } - if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) { - delete(a.publishers, k) - } - } a.publishers[SessionId(sessionId)] = &SessionEntry[*pub_client.TopicPublisher]{ entry: topicPublisher, } diff --git a/weed/mq/agent/agent_grpc_publish.go b/weed/mq/agent/agent_grpc_publish.go index dc3b5ee35..0b666ff6d 100644 --- a/weed/mq/agent/agent_grpc_publish.go +++ b/weed/mq/agent/agent_grpc_publish.go @@ -3,7 +3,6 @@ package agent import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" - "time" ) func (a *MessageQueueAgent) PublishRecord(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error { @@ -11,16 +10,18 @@ func (a *MessageQueueAgent) PublishRecord(stream mq_agent_pb.SeaweedMessagingAge if err != nil { return err } + sessionId := SessionId(m.SessionId) a.publishersLock.RLock() - publisherEntry, found := a.publishers[SessionId(m.SessionId)] + publisherEntry, found := a.publishers[sessionId] a.publishersLock.RUnlock() if !found { - return fmt.Errorf("publish session id %d not found", m.SessionId) + return fmt.Errorf("publish session id %d not found", sessionId) } defer func() { - publisherEntry.lastActiveTsNs = time.Now().UnixNano() + a.publishersLock.Lock() + delete(a.publishers, sessionId) + a.publishersLock.Unlock() }() - publisherEntry.lastActiveTsNs = 0 if m.Value != nil { if err := publisherEntry.entry.PublishRecord(m.Key, m.Value); err != nil { diff --git a/weed/mq/agent/agent_grpc_sub_session.go b/weed/mq/agent/agent_grpc_sub_session.go index 17def9ed5..b01fbcc65 100644 --- a/weed/mq/agent/agent_grpc_sub_session.go +++ b/weed/mq/agent/agent_grpc_sub_session.go @@ -8,7 +8,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "math/rand/v2" - "time" ) func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_agent_pb.StartSubscribeSessionRequest) (*mq_agent_pb.StartSubscribeSessionResponse, error) { @@ -36,16 +35,6 @@ func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_a ) a.subscribersLock.Lock() - // remove inactive publishers to avoid memory leak - for k, entry := range a.subscribers { - if entry.lastActiveTsNs == 0 { - // this is an active session - continue - } - if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) { - delete(a.subscribers, k) - } - } a.subscribers[SessionId(sessionId)] = &SessionEntry[*sub_client.TopicSubscriber]{ entry: topicSubscriber, } diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index 0e9adc720..df438c05e 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -7,8 +7,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" - "time" ) func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { @@ -26,31 +26,35 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA return fmt.Errorf("subscribe session id %d not found", sessionId) } defer func() { - subscriberEntry.lastActiveTsNs = time.Now().UnixNano() + a.subscribersLock.Lock() + delete(a.subscribers, sessionId) + a.subscribersLock.Unlock() }() - subscriberEntry.lastActiveTsNs = 0 var lastErr error + executors := util.NewLimitedConcurrentExecutor(int(subscriberEntry.entry.SubscriberConfig.SlidingWindowSize)) subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { - record := &schema_pb.RecordValue{} - err := proto.Unmarshal(m.Data.Value, record) - if err != nil { - glog.V(0).Infof("unmarshal record value: %v", err) - if lastErr == nil { - lastErr = err + executors.Execute(func() { + record := &schema_pb.RecordValue{} + err := proto.Unmarshal(m.Data.Value, record) + if err != nil { + glog.V(0).Infof("unmarshal record value: %v", err) + if lastErr == nil { + lastErr = err + } + return } - return - } - if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{ - Key: m.Data.Key, - Value: record, - TsNs: m.Data.TsNs, - }); sendErr != nil { - glog.V(0).Infof("send record: %v", sendErr) - if lastErr == nil { - lastErr = sendErr + if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{ + Key: m.Data.Key, + Value: record, + TsNs: m.Data.TsNs, + }); sendErr != nil { + glog.V(0).Infof("send record: %v", sendErr) + if lastErr == nil { + lastErr = sendErr + } } - } + }) }) go func() { diff --git a/weed/mq/agent/agent_server.go b/weed/mq/agent/agent_server.go index f98c62d97..f1d6ec679 100644 --- a/weed/mq/agent/agent_server.go +++ b/weed/mq/agent/agent_server.go @@ -11,8 +11,7 @@ import ( type SessionId int64 type SessionEntry[T any] struct { - entry T - lastActiveTsNs int64 + entry T } type MessageQueueAgentOptions struct {