From 95e9f0d83899dc045df7c7a58e77e05ebd63de5e Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 2 Mar 2025 22:22:59 -0800 Subject: [PATCH] refactor --- weed/mq/client/agent_client/agent_publish.go | 13 ------------ .../mq/client/agent_client/agent_subscribe.go | 21 ------------------- .../mq/client/agent_client/publish_session.go | 7 +++++++ .../client/agent_client/subscribe_session.go | 16 ++++++++++++++ 4 files changed, 23 insertions(+), 34 deletions(-) delete mode 100644 weed/mq/client/agent_client/agent_publish.go delete mode 100644 weed/mq/client/agent_client/agent_subscribe.go diff --git a/weed/mq/client/agent_client/agent_publish.go b/weed/mq/client/agent_client/agent_publish.go deleted file mode 100644 index 862615e91..000000000 --- a/weed/mq/client/agent_client/agent_publish.go +++ /dev/null @@ -1,13 +0,0 @@ -package agent_client - -import ( - "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" -) - -func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error { - return a.stream.Send(&mq_agent_pb.PublishRecordRequest{ - Key: key, - Value: record, - }) -} diff --git a/weed/mq/client/agent_client/agent_subscribe.go b/weed/mq/client/agent_client/agent_subscribe.go deleted file mode 100644 index 61024365a..000000000 --- a/weed/mq/client/agent_client/agent_subscribe.go +++ /dev/null @@ -1,21 +0,0 @@ -package agent_client - -import ( - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" -) - -func (a *SubscribeSession) SubscribeMessageRecord( - onEachMessageFn func(key []byte, record *schema_pb.RecordValue), - onCompletionFn func()) error { - for { - resp, err := a.stream.Recv() - if err != nil { - return err - } - onEachMessageFn(resp.Key, resp.Value) - } - if onCompletionFn != nil { - onCompletionFn() - } - return nil -} diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go index 9ebe2749d..c12d345a1 100644 --- a/weed/mq/client/agent_client/publish_session.go +++ b/weed/mq/client/agent_client/publish_session.go @@ -72,3 +72,10 @@ func (a *PublishSession) CloseSession() error { a.schema = nil return err } + +func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error { + return a.stream.Send(&mq_agent_pb.PublishRecordRequest{ + Key: key, + Value: record, + }) +} diff --git a/weed/mq/client/agent_client/subscribe_session.go b/weed/mq/client/agent_client/subscribe_session.go index 3e9afaca5..397450a29 100644 --- a/weed/mq/client/agent_client/subscribe_session.go +++ b/weed/mq/client/agent_client/subscribe_session.go @@ -69,3 +69,19 @@ func (s *SubscribeSession) CloseSession() error { err := s.stream.CloseSend() return err } + +func (a *SubscribeSession) SubscribeMessageRecord( + onEachMessageFn func(key []byte, record *schema_pb.RecordValue), + onCompletionFn func()) error { + for { + resp, err := a.stream.Recv() + if err != nil { + return err + } + onEachMessageFn(resp.Key, resp.Value) + } + if onCompletionFn != nil { + onCompletionFn() + } + return nil +}