|  |  | @ -6,13 +6,10 @@ import ( | 
			
		
	
		
			
				
					|  |  |  | 	"fmt" | 
			
		
	
		
			
				
					|  |  |  | 	"strings" | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	"google.golang.org/protobuf/proto" | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/glog" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/util" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | @ -65,55 +62,23 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage | 
			
		
	
		
			
				
					|  |  |  | 	messageCount := 0 | 
			
		
	
		
			
				
					|  |  |  | 	startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Create a custom LoopProcessLogData function that captures the batch index
 | 
			
		
	
		
			
				
					|  |  |  | 	// Since we can't modify the existing EachLogEntryFuncType signature,
 | 
			
		
	
		
			
				
					|  |  |  | 	// we'll implement our own iteration logic based on LoopProcessLogData
 | 
			
		
	
		
			
				
					|  |  |  | 	var lastReadPosition = startPosition | 
			
		
	
		
			
				
					|  |  |  | 	var isDone bool | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for !isDone { | 
			
		
	
		
			
				
					|  |  |  | 		// Use ReadFromBuffer to get the next batch with its correct batch index
 | 
			
		
	
		
			
				
					|  |  |  | 		bytesBuf, batchIndex, err := localPartition.LogBuffer.ReadFromBuffer(lastReadPosition) | 
			
		
	
		
			
				
					|  |  |  | 		if err == log_buffer.ResumeFromDiskError { | 
			
		
	
		
			
				
					|  |  |  | 			break | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return err | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		// If no more data in memory, we're done
 | 
			
		
	
		
			
				
					|  |  |  | 		if bytesBuf == nil { | 
			
		
	
		
			
				
					|  |  |  | 			break | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		// Process all messages in this batch
 | 
			
		
	
		
			
				
					|  |  |  | 		buf := bytesBuf.Bytes() | 
			
		
	
		
			
				
					|  |  |  | 		for pos := 0; pos+4 < len(buf); { | 
			
		
	
		
			
				
					|  |  |  | 			size := util.BytesToUint32(buf[pos : pos+4]) | 
			
		
	
		
			
				
					|  |  |  | 			if pos+4+int(size) > len(buf) { | 
			
		
	
		
			
				
					|  |  |  | 				break | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			entryData := buf[pos+4 : pos+4+int(size)] | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			logEntry := &filer_pb.LogEntry{} | 
			
		
	
		
			
				
					|  |  |  | 			if err = proto.Unmarshal(entryData, logEntry); err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				pos += 4 + int(size) | 
			
		
	
		
			
				
					|  |  |  | 				continue | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			// Now we have the correct batchIndex for this message
 | 
			
		
	
		
			
				
					|  |  |  | 	// Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
 | 
			
		
	
		
			
				
					|  |  |  | 	_, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex( | 
			
		
	
		
			
				
					|  |  |  | 		"GetUnflushedMessages", | 
			
		
	
		
			
				
					|  |  |  | 		startPosition, | 
			
		
	
		
			
				
					|  |  |  | 		0,                            // stopTsNs = 0 means process all available data
 | 
			
		
	
		
			
				
					|  |  |  | 		func() bool { return false }, // waitForDataFn = false means don't wait for new data
 | 
			
		
	
		
			
				
					|  |  |  | 		func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) { | 
			
		
	
		
			
				
					|  |  |  | 			// Apply buffer index filtering if specified
 | 
			
		
	
		
			
				
					|  |  |  | 			if startBufferIndex > 0 && batchIndex < startBufferIndex { | 
			
		
	
		
			
				
					|  |  |  | 				glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) | 
			
		
	
		
			
				
					|  |  |  | 				pos += 4 + int(size) | 
			
		
	
		
			
				
					|  |  |  | 				continue | 
			
		
	
		
			
				
					|  |  |  | 				return false, nil | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			// Check if this message is from a buffer range that's already been flushed
 | 
			
		
	
		
			
				
					|  |  |  | 			if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { | 
			
		
	
		
			
				
					|  |  |  | 				glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) | 
			
		
	
		
			
				
					|  |  |  | 				pos += 4 + int(size) | 
			
		
	
		
			
				
					|  |  |  | 				continue | 
			
		
	
		
			
				
					|  |  |  | 				return false, nil | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			// Stream this message
 | 
			
		
	
	
		
			
				
					|  |  | @ -129,18 +94,13 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				glog.Errorf("Failed to stream message: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 				isDone = true | 
			
		
	
		
			
				
					|  |  |  | 				break | 
			
		
	
		
			
				
					|  |  |  | 				return true, err // isDone = true to stop processing
 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			messageCount++ | 
			
		
	
		
			
				
					|  |  |  | 			lastReadPosition = log_buffer.NewMessagePosition(logEntry.TsNs, batchIndex) | 
			
		
	
		
			
				
					|  |  |  | 			pos += 4 + int(size) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		// Release the buffer back to the pool
 | 
			
		
	
		
			
				
					|  |  |  | 		localPartition.LogBuffer.ReleaseMemory(bytesBuf) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 			return false, nil // Continue processing
 | 
			
		
	
		
			
				
					|  |  |  | 		}, | 
			
		
	
		
			
				
					|  |  |  | 	) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Handle collection errors
 | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil && err != log_buffer.ResumeFromDiskError { | 
			
		
	
	
		
			
				
					|  |  | 
 |