Browse Source

refactor

mq-subscribe
chrislu 9 months ago
parent
commit
ccdd9cd8de
  1. 2
      weed/filer/filer_notify.go
  2. 2
      weed/filer/meta_aggregator.go
  3. 2
      weed/mq/broker/broker_grpc_pub_follow.go
  4. 2
      weed/mq/topic/local_partition.go
  5. 7
      weed/util/log_buffer/log_buffer.go
  6. 7
      weed/util/log_buffer/log_buffer_test.go

2
weed/filer/filer_notify.go

@ -83,7 +83,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
return
}
f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
f.LocalMetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs)
}

2
weed/filer/meta_aggregator.go

@ -168,7 +168,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
}
dir := event.Directory
// println("received meta change", dir, "size", len(data))
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}

2
weed/mq/broker/broker_grpc_pub_follow.go

@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
// TODO: change this to DataMessage
// log the message
logBuffer.AddToBuffer(dataMessage.Key, dataMessage.Value, dataMessage.TsNs)
logBuffer.AddToBuffer(dataMessage)
// send back the ack
if err := stream.Send(&mq_pb.PublishFollowMeResponse{

2
weed/mq/topic/local_partition.go

@ -52,7 +52,7 @@ func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncTy
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
p.LogBuffer.AddToBuffer(message)
// maybe send to the follower
if p.followerStream != nil {

7
weed/util/log_buffer/log_buffer.go

@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"sync/atomic"
"time"
@ -68,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
return lb
}
func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
var toFlush *dataToFlush
logBuffer.Lock()

7
weed/util/log_buffer/log_buffer_test.go

@ -3,6 +3,7 @@ package log_buffer
import (
"crypto/rand"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"sync"
"testing"
@ -50,7 +51,11 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
lb.AddToBuffer(nil, buf, 0)
lb.AddToBuffer(&mq_pb.DataMessage{
Key: nil,
Value: buf,
TsNs: 0,
})
}
wg.Wait()

Loading…
Cancel
Save