You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							387 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							387 lines
						
					
					
						
							12 KiB
						
					
					
				
								package log_buffer
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"bytes"
							 | 
						|
									"sync"
							 | 
						|
									"sync/atomic"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								const BufferSize = 8 * 1024 * 1024
							 | 
						|
								const PreviousBufferCount = 32
							 | 
						|
								
							 | 
						|
								type dataToFlush struct {
							 | 
						|
									startTime time.Time
							 | 
						|
									stopTime  time.Time
							 | 
						|
									data      *bytes.Buffer
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
							 | 
						|
								type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
							 | 
						|
								type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
							 | 
						|
								type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
							 | 
						|
								
							 | 
						|
								type LogBuffer struct {
							 | 
						|
									LastFlushTsNs     int64
							 | 
						|
									name              string
							 | 
						|
									prevBuffers       *SealedBuffers
							 | 
						|
									buf               []byte
							 | 
						|
									batchIndex        int64
							 | 
						|
									idx               []int
							 | 
						|
									pos               int
							 | 
						|
									startTime         time.Time
							 | 
						|
									stopTime          time.Time
							 | 
						|
									lastFlushDataTime time.Time
							 | 
						|
									sizeBuf           []byte
							 | 
						|
									flushInterval     time.Duration
							 | 
						|
									flushFn           LogFlushFuncType
							 | 
						|
									ReadFromDiskFn    LogReadFromDiskFuncType
							 | 
						|
									notifyFn          func()
							 | 
						|
									isStopping        *atomic.Bool
							 | 
						|
									isAllFlushed      bool
							 | 
						|
									flushChan         chan *dataToFlush
							 | 
						|
									LastTsNs          atomic.Int64
							 | 
						|
									sync.RWMutex
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
							 | 
						|
									readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
							 | 
						|
									lb := &LogBuffer{
							 | 
						|
										name:           name,
							 | 
						|
										prevBuffers:    newSealedBuffers(PreviousBufferCount),
							 | 
						|
										buf:            make([]byte, BufferSize),
							 | 
						|
										sizeBuf:        make([]byte, 4),
							 | 
						|
										flushInterval:  flushInterval,
							 | 
						|
										flushFn:        flushFn,
							 | 
						|
										ReadFromDiskFn: readFromDiskFn,
							 | 
						|
										notifyFn:       notifyFn,
							 | 
						|
										flushChan:      make(chan *dataToFlush, 256),
							 | 
						|
										isStopping:     new(atomic.Bool),
							 | 
						|
										batchIndex:     time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
							 | 
						|
									}
							 | 
						|
									go lb.loopFlush()
							 | 
						|
									go lb.loopInterval()
							 | 
						|
									return lb
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
							 | 
						|
									logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
							 | 
						|
								
							 | 
						|
									// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
							 | 
						|
									var ts time.Time
							 | 
						|
									if processingTsNs == 0 {
							 | 
						|
										ts = time.Now()
							 | 
						|
										processingTsNs = ts.UnixNano()
							 | 
						|
									} else {
							 | 
						|
										ts = time.Unix(0, processingTsNs)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									logEntry := &filer_pb.LogEntry{
							 | 
						|
										TsNs:             processingTsNs, // Will be updated if needed
							 | 
						|
										PartitionKeyHash: util.HashToInt32(partitionKey),
							 | 
						|
										Data:             data,
							 | 
						|
										Key:              partitionKey,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									logEntryData, _ := proto.Marshal(logEntry)
							 | 
						|
								
							 | 
						|
									var toFlush *dataToFlush
							 | 
						|
									logBuffer.Lock()
							 | 
						|
									defer func() {
							 | 
						|
										logBuffer.Unlock()
							 | 
						|
										if toFlush != nil {
							 | 
						|
											logBuffer.flushChan <- toFlush
							 | 
						|
										}
							 | 
						|
										if logBuffer.notifyFn != nil {
							 | 
						|
											logBuffer.notifyFn()
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Handle timestamp collision inside lock (rare case)
							 | 
						|
									if logBuffer.LastTsNs.Load() >= processingTsNs {
							 | 
						|
										processingTsNs = logBuffer.LastTsNs.Add(1)
							 | 
						|
										ts = time.Unix(0, processingTsNs)
							 | 
						|
										// Re-marshal with corrected timestamp
							 | 
						|
										logEntry.TsNs = processingTsNs
							 | 
						|
										logEntryData, _ = proto.Marshal(logEntry)
							 | 
						|
									} else {
							 | 
						|
										logBuffer.LastTsNs.Store(processingTsNs)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									size := len(logEntryData)
							 | 
						|
								
							 | 
						|
									if logBuffer.pos == 0 {
							 | 
						|
										logBuffer.startTime = ts
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
							 | 
						|
										// glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
							 | 
						|
										toFlush = logBuffer.copyToFlush()
							 | 
						|
										logBuffer.startTime = ts
							 | 
						|
										if len(logBuffer.buf) < size+4 {
							 | 
						|
											logBuffer.buf = make([]byte, 2*size+4)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									logBuffer.stopTime = ts
							 | 
						|
								
							 | 
						|
									logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
							 | 
						|
									util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
							 | 
						|
									copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
							 | 
						|
									copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
							 | 
						|
									logBuffer.pos += size + 4
							 | 
						|
								
							 | 
						|
									// fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx))
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) IsStopping() bool {
							 | 
						|
									return logBuffer.isStopping.Load()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ShutdownLogBuffer flushes the buffer and stops the log buffer
							 | 
						|
								func (logBuffer *LogBuffer) ShutdownLogBuffer() {
							 | 
						|
									isAlreadyStopped := logBuffer.isStopping.Swap(true)
							 | 
						|
									if isAlreadyStopped {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
									toFlush := logBuffer.copyToFlush()
							 | 
						|
									logBuffer.flushChan <- toFlush
							 | 
						|
									close(logBuffer.flushChan)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
							 | 
						|
								func (logBuffer *LogBuffer) IsAllFlushed() bool {
							 | 
						|
									return logBuffer.isAllFlushed
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) loopFlush() {
							 | 
						|
									for d := range logBuffer.flushChan {
							 | 
						|
										if d != nil {
							 | 
						|
											// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
							 | 
						|
											logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes())
							 | 
						|
											d.releaseMemory()
							 | 
						|
											// local logbuffer is different from aggregate logbuffer here
							 | 
						|
											logBuffer.lastFlushDataTime = d.stopTime
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									logBuffer.isAllFlushed = true
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) loopInterval() {
							 | 
						|
									for !logBuffer.IsStopping() {
							 | 
						|
										time.Sleep(logBuffer.flushInterval)
							 | 
						|
										if logBuffer.IsStopping() {
							 | 
						|
											return
							 | 
						|
										}
							 | 
						|
										logBuffer.Lock()
							 | 
						|
										toFlush := logBuffer.copyToFlush()
							 | 
						|
										logBuffer.Unlock()
							 | 
						|
										if toFlush != nil {
							 | 
						|
											glog.V(4).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes()))
							 | 
						|
											logBuffer.flushChan <- toFlush
							 | 
						|
										} else {
							 | 
						|
											// glog.V(0).Infof("%s no flush", m.name)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
							 | 
						|
								
							 | 
						|
									if logBuffer.pos > 0 {
							 | 
						|
										// fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
							 | 
						|
										var d *dataToFlush
							 | 
						|
										if logBuffer.flushFn != nil {
							 | 
						|
											d = &dataToFlush{
							 | 
						|
												startTime: logBuffer.startTime,
							 | 
						|
												stopTime:  logBuffer.stopTime,
							 | 
						|
												data:      copiedBytes(logBuffer.buf[:logBuffer.pos]),
							 | 
						|
											}
							 | 
						|
											// glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
							 | 
						|
										} else {
							 | 
						|
											// glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
							 | 
						|
											logBuffer.lastFlushDataTime = logBuffer.stopTime
							 | 
						|
										}
							 | 
						|
										logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex)
							 | 
						|
										logBuffer.startTime = time.Unix(0, 0)
							 | 
						|
										logBuffer.stopTime = time.Unix(0, 0)
							 | 
						|
										logBuffer.pos = 0
							 | 
						|
										logBuffer.idx = logBuffer.idx[:0]
							 | 
						|
										logBuffer.batchIndex++
							 | 
						|
										return d
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
							 | 
						|
									return logBuffer.startTime
							 | 
						|
								}
							 | 
						|
								func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
							 | 
						|
									return MessagePosition{
							 | 
						|
										Time:       logBuffer.startTime,
							 | 
						|
										BatchIndex: logBuffer.batchIndex,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (d *dataToFlush) releaseMemory() {
							 | 
						|
									d.data.Reset()
							 | 
						|
									bufferPool.Put(d.data)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error) {
							 | 
						|
									logBuffer.RLock()
							 | 
						|
									defer logBuffer.RUnlock()
							 | 
						|
								
							 | 
						|
									// Read from disk and memory
							 | 
						|
									//	1. read from disk, last time is = td
							 | 
						|
									//	2. in memory, the earliest time = tm
							 | 
						|
									//	if tm <= td, case 2.1
							 | 
						|
									//		read from memory
							 | 
						|
									//	if tm is empty, case 2.2
							 | 
						|
									//		read from memory
							 | 
						|
									//	if td < tm, case 2.3
							 | 
						|
									//		read from disk again
							 | 
						|
									var tsMemory time.Time
							 | 
						|
									var tsBatchIndex int64
							 | 
						|
									if !logBuffer.startTime.IsZero() {
							 | 
						|
										tsMemory = logBuffer.startTime
							 | 
						|
										tsBatchIndex = logBuffer.batchIndex
							 | 
						|
									}
							 | 
						|
									for _, prevBuf := range logBuffer.prevBuffers.buffers {
							 | 
						|
										if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
							 | 
						|
											tsMemory = prevBuf.startTime
							 | 
						|
											tsBatchIndex = prevBuf.batchIndex
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									if tsMemory.IsZero() { // case 2.2
							 | 
						|
										// println("2.2 no data")
							 | 
						|
										return nil, -2, nil
							 | 
						|
									} else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
							 | 
						|
										if !logBuffer.lastFlushDataTime.IsZero() {
							 | 
						|
											glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime)
							 | 
						|
											return nil, -2, ResumeFromDiskError
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// the following is case 2.1
							 | 
						|
								
							 | 
						|
									if lastReadPosition.Equal(logBuffer.stopTime) {
							 | 
						|
										return nil, logBuffer.batchIndex, nil
							 | 
						|
									}
							 | 
						|
									if lastReadPosition.After(logBuffer.stopTime) {
							 | 
						|
										// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
							 | 
						|
										return nil, logBuffer.batchIndex, nil
							 | 
						|
									}
							 | 
						|
									if lastReadPosition.Before(logBuffer.startTime) {
							 | 
						|
										// println("checking ", lastReadPosition.UnixNano())
							 | 
						|
										for _, buf := range logBuffer.prevBuffers.buffers {
							 | 
						|
											if buf.startTime.After(lastReadPosition.Time) {
							 | 
						|
												// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
							 | 
						|
												// println("return the", i, "th in memory", buf.startTime.UnixNano())
							 | 
						|
												return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil
							 | 
						|
											}
							 | 
						|
											if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
							 | 
						|
												pos := buf.locateByTs(lastReadPosition.Time)
							 | 
						|
												// fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
							 | 
						|
												return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
							 | 
						|
										return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									lastTs := lastReadPosition.UnixNano()
							 | 
						|
									l, h := 0, len(logBuffer.idx)-1
							 | 
						|
								
							 | 
						|
									/*
							 | 
						|
										for i, pos := range m.idx {
							 | 
						|
											logEntry, ts := readTs(m.buf, pos)
							 | 
						|
											event := &filer_pb.SubscribeMetadataResponse{}
							 | 
						|
											proto.Unmarshal(logEntry.Data, event)
							 | 
						|
											entry := event.EventNotification.OldEntry
							 | 
						|
											if entry == nil {
							 | 
						|
												entry = event.EventNotification.NewEntry
							 | 
						|
											}
							 | 
						|
											fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
							 | 
						|
										}
							 | 
						|
										fmt.Printf("l=%d, h=%d\n", l, h)
							 | 
						|
									*/
							 | 
						|
								
							 | 
						|
									for l <= h {
							 | 
						|
										mid := (l + h) / 2
							 | 
						|
										pos := logBuffer.idx[mid]
							 | 
						|
										_, t := readTs(logBuffer.buf, pos)
							 | 
						|
										if t <= lastTs {
							 | 
						|
											l = mid + 1
							 | 
						|
										} else if lastTs < t {
							 | 
						|
											var prevT int64
							 | 
						|
											if mid > 0 {
							 | 
						|
												_, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
							 | 
						|
											}
							 | 
						|
											if prevT <= lastTs {
							 | 
						|
												// fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
							 | 
						|
												return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil
							 | 
						|
											}
							 | 
						|
											h = mid
							 | 
						|
										}
							 | 
						|
										// fmt.Printf("l=%d, h=%d\n", l, h)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// FIXME: this could be that the buffer has been flushed already
							 | 
						|
									println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex)
							 | 
						|
									return nil, -2, nil
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
							 | 
						|
									bufferPool.Put(b)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetName returns the log buffer name for metadata tracking
							 | 
						|
								func (logBuffer *LogBuffer) GetName() string {
							 | 
						|
									logBuffer.RLock()
							 | 
						|
									defer logBuffer.RUnlock()
							 | 
						|
									return logBuffer.name
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetBatchIndex returns the current batch index for metadata tracking
							 | 
						|
								func (logBuffer *LogBuffer) GetBatchIndex() int64 {
							 | 
						|
									logBuffer.RLock()
							 | 
						|
									defer logBuffer.RUnlock()
							 | 
						|
									return logBuffer.batchIndex
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								var bufferPool = sync.Pool{
							 | 
						|
									New: func() interface{} {
							 | 
						|
										return new(bytes.Buffer)
							 | 
						|
									},
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func copiedBytes(buf []byte) (copied *bytes.Buffer) {
							 | 
						|
									copied = bufferPool.Get().(*bytes.Buffer)
							 | 
						|
									copied.Reset()
							 | 
						|
									copied.Write(buf)
							 | 
						|
									return
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func readTs(buf []byte, pos int) (size int, ts int64) {
							 | 
						|
								
							 | 
						|
									size = int(util.BytesToUint32(buf[pos : pos+4]))
							 | 
						|
									entryData := buf[pos+4 : pos+4+size]
							 | 
						|
									logEntry := &filer_pb.LogEntry{}
							 | 
						|
								
							 | 
						|
									err := proto.Unmarshal(entryData, logEntry)
							 | 
						|
									if err != nil {
							 | 
						|
										glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
							 | 
						|
									}
							 | 
						|
									return size, logEntry.TsNs
							 | 
						|
								
							 | 
						|
								}
							 |