From ccdd9cd8decf66089ac201b7c2ca1f5889582b93 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Apr 2024 16:25:43 -0700 Subject: [PATCH] refactor --- weed/filer/filer_notify.go | 2 +- weed/filer/meta_aggregator.go | 2 +- weed/mq/broker/broker_grpc_pub_follow.go | 2 +- weed/mq/topic/local_partition.go | 2 +- weed/util/log_buffer/log_buffer.go | 7 ++++++- weed/util/log_buffer/log_buffer_test.go | 7 ++++++- 6 files changed, 16 insertions(+), 6 deletions(-) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index db78b3d3d..db953d398 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -83,7 +83,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica return } - f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) + f.LocalMetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs) } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 663fdfe9f..976822ad1 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -168,7 +168,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) + ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 57cbbd2d2..d8100f021 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi // TODO: change this to DataMessage // log the message - logBuffer.AddToBuffer(dataMessage.Key, dataMessage.Value, dataMessage.TsNs) + logBuffer.AddToBuffer(dataMessage) // send back the ack if err := stream.Send(&mq_pb.PublishFollowMeResponse{ diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 157fa2792..54c122a0f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -52,7 +52,7 @@ func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncTy } func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { - p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) + p.LogBuffer.AddToBuffer(message) // maybe send to the follower if p.followerStream != nil { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 65d20a757..efe42176e 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "sync" "sync/atomic" "time" @@ -68,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc return lb } -func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { + logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) +} + +func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { var toFlush *dataToFlush logBuffer.Lock() diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 84279f625..067a02ef4 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -3,6 +3,7 @@ package log_buffer import ( "crypto/rand" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "sync" "testing" @@ -50,7 +51,11 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf, 0) + lb.AddToBuffer(&mq_pb.DataMessage{ + Key: nil, + Value: buf, + TsNs: 0, + }) } wg.Wait()