diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go index 565cb0aa9..104722af1 100644 --- a/weed/mq/broker/broker_log_buffer_offset.go +++ b/weed/mq/broker/broker_log_buffer_offset.go @@ -8,7 +8,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "google.golang.org/protobuf/proto" ) // OffsetAssignmentFunc is a function type for assigning offsets to messages @@ -30,13 +29,9 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( } // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock - var ts time.Time processingTsNs := message.TsNs if processingTsNs == 0 { - ts = time.Now() - processingTsNs = ts.UnixNano() - } else { - ts = time.Unix(0, processingTsNs) + processingTsNs = time.Now().UnixNano() } // Create LogEntry with assigned offset @@ -48,35 +43,21 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( Offset: offset, // Add the assigned offset } - logEntryData, err := proto.Marshal(logEntry) - if err != nil { - return err - } - // Use the existing LogBuffer infrastructure for the rest // TODO: This is a workaround - ideally LogBuffer should handle offset assignment // For now, we'll add the message with the pre-assigned offset - return b.addLogEntryToBuffer(logBuffer, logEntry, logEntryData, ts) + return b.addLogEntryToBuffer(logBuffer, logEntry) } // addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer -// This is a helper function that mimics LogBuffer.AddDataToBuffer but with a pre-built LogEntry +// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer func (b *MessageQueueBroker) addLogEntryToBuffer( logBuffer *log_buffer.LogBuffer, logEntry *filer_pb.LogEntry, - logEntryData []byte, - ts time.Time, ) error { - // TODO: This is a simplified version of LogBuffer.AddDataToBuffer - // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic - // This should be properly integrated when LogBuffer is modified - - // Use the new AddLogEntryToBuffer method to preserve offset information + // Use the AddLogEntryToBuffer method to preserve offset information // This ensures the offset is maintained throughout the entire data flow - if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { - return err - } - return nil + return logBuffer.AddLogEntryToBuffer(logEntry) } // GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)