|
|
|
@ -8,7 +8,6 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|
|
|
"google.golang.org/protobuf/proto" |
|
|
|
) |
|
|
|
|
|
|
|
// OffsetAssignmentFunc is a function type for assigning offsets to messages
|
|
|
|
@ -30,13 +29,9 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( |
|
|
|
} |
|
|
|
|
|
|
|
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
|
|
|
|
var ts time.Time |
|
|
|
processingTsNs := message.TsNs |
|
|
|
if processingTsNs == 0 { |
|
|
|
ts = time.Now() |
|
|
|
processingTsNs = ts.UnixNano() |
|
|
|
} else { |
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
|
processingTsNs = time.Now().UnixNano() |
|
|
|
} |
|
|
|
|
|
|
|
// Create LogEntry with assigned offset
|
|
|
|
@ -48,35 +43,21 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( |
|
|
|
Offset: offset, // Add the assigned offset
|
|
|
|
} |
|
|
|
|
|
|
|
logEntryData, err := proto.Marshal(logEntry) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// Use the existing LogBuffer infrastructure for the rest
|
|
|
|
// TODO: This is a workaround - ideally LogBuffer should handle offset assignment
|
|
|
|
// For now, we'll add the message with the pre-assigned offset
|
|
|
|
return b.addLogEntryToBuffer(logBuffer, logEntry, logEntryData, ts) |
|
|
|
return b.addLogEntryToBuffer(logBuffer, logEntry) |
|
|
|
} |
|
|
|
|
|
|
|
// addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer
|
|
|
|
// This is a helper function that mimics LogBuffer.AddDataToBuffer but with a pre-built LogEntry
|
|
|
|
// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer
|
|
|
|
func (b *MessageQueueBroker) addLogEntryToBuffer( |
|
|
|
logBuffer *log_buffer.LogBuffer, |
|
|
|
logEntry *filer_pb.LogEntry, |
|
|
|
logEntryData []byte, |
|
|
|
ts time.Time, |
|
|
|
) error { |
|
|
|
// TODO: This is a simplified version of LogBuffer.AddDataToBuffer
|
|
|
|
// ASSUMPTION: We're bypassing some of the LogBuffer's internal logic
|
|
|
|
// This should be properly integrated when LogBuffer is modified
|
|
|
|
|
|
|
|
// Use the new AddLogEntryToBuffer method to preserve offset information
|
|
|
|
// Use the AddLogEntryToBuffer method to preserve offset information
|
|
|
|
// This ensures the offset is maintained throughout the entire data flow
|
|
|
|
if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
return logBuffer.AddLogEntryToBuffer(logEntry) |
|
|
|
} |
|
|
|
|
|
|
|
// GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)
|
|
|
|
|