From 0e9f433ec4c91cad61e01383033b2e3c4d90174f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 4 Jan 2026 11:40:42 -0800 Subject: [PATCH] refactoring --- weed/mq/broker/broker_grpc_query.go | 2 +- .../broker_topic_partition_read_write.go | 7 ++- weed/util/log_buffer/log_buffer.go | 53 +++++++++---------- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 228152bdf..c7d180bd9 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -89,7 +89,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage } // Use buffer_start offset for precise deduplication - lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs + lastFlushTsNs := localPartition.LogBuffer.GetLastFlushTsNs() startBufferOffset := req.StartBufferOffset startTimeNs := lastFlushTsNs // Still respect last flush time for safety diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 18f9c98b0..7b02c7010 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -2,7 +2,6 @@ package broker import ( "fmt" - "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -41,14 +40,14 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l } } - atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano()) + logBuffer.SetLastFlushTsNs(stopTime.UnixNano()) b.accessLock.Lock() defer b.accessLock.Unlock() if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil { - localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) + localPartition.NotifyLogFlushed(logBuffer.GetLastFlushTsNs()) } - glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferOffset) + glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.GetLastFlushTsNs(), targetFile, len(buf), logBuffer.GetName(), bufferOffset) } } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 1c65fc916..60c0f35d6 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -55,35 +55,36 @@ type CachedDiskChunk struct { } type LogBuffer struct { - LastFlushTsNs int64 - name string - prevBuffers *SealedBuffers - buf []byte - offset int64 // Last offset in current buffer (endOffset) - bufferStartOffset int64 // First offset in current buffer - idx []int - pos int + // 8-byte aligned fields + LastTsNs atomic.Int64 + lastFlushTsNs atomic.Int64 + lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) + offset int64 + bufferStartOffset int64 + minOffset int64 + maxOffset int64 + flushInterval time.Duration startTime time.Time stopTime time.Time - lastFlushDataTime time.Time - sizeBuf []byte - flushInterval time.Duration - flushFn LogFlushFuncType - ReadFromDiskFn LogReadFromDiskFuncType - notifyFn func() + + // Other fields + name string + prevBuffers *SealedBuffers + buf []byte + idx []int + pos int + sizeBuf []byte + flushFn LogFlushFuncType + ReadFromDiskFn LogReadFromDiskFuncType + notifyFn func() // Per-subscriber notification channels for instant wake-up subscribersMu sync.RWMutex subscribers map[string]chan struct{} // subscriberID -> notification channel isStopping *atomic.Bool isAllFlushed bool flushChan chan *dataToFlush - LastTsNs atomic.Int64 // Offset range tracking for Kafka integration - minOffset int64 - maxOffset int64 - hasOffsets bool - lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) - lastFlushTsNs atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) + hasOffsets bool // Disk chunk cache for historical data reads diskChunkCache *DiskChunkCache sync.RWMutex @@ -235,7 +236,7 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn logBuffer.bufferStartOffset = nextOffset // CRITICAL: Track that data [0...highestOffset] is on disk logBuffer.lastFlushedOffset.Store(highestOffset) - // Set lastFlushedTime to current time (we know data up to highestOffset is on disk) + // Set lastFlushTsNs to current time (we know data up to highestOffset is on disk) logBuffer.lastFlushTsNs.Store(time.Now().UnixNano()) } else { logBuffer.bufferStartOffset = 0 // Start from offset 0 @@ -507,10 +508,6 @@ func (logBuffer *LogBuffer) loopFlush() { logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here - logBuffer.lastFlushDataTime = d.stopTime - - // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads - // Use >= 0 to include offset 0 (first message in a topic) if d.maxOffset >= 0 { logBuffer.lastFlushedOffset.Store(d.maxOffset) } @@ -567,8 +564,6 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush if withCallback { d.done = make(chan struct{}) } - } else { - logBuffer.lastFlushDataTime = logBuffer.stopTime } // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 lastOffsetInBuffer := logBuffer.offset - 1 @@ -624,6 +619,10 @@ func (logBuffer *LogBuffer) GetLastFlushTsNs() int64 { return logBuffer.lastFlushTsNs.Load() } +func (logBuffer *LogBuffer) SetLastFlushTsNs(ts int64) { + logBuffer.lastFlushTsNs.Store(ts) +} + func (d *dataToFlush) releaseMemory() { d.data.Reset() bufferPool.Put(d.data)