You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

107 lines
3.4 KiB

package topic
import (
"fmt"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// OffsetAssignmentFunc is a function type for assigning offsets to messages
type OffsetAssignmentFunc func() (int64, error)
// PublishWithOffset publishes a message with offset assignment
// TODO: This extends LocalPartition with offset support
// ASSUMPTION: This will eventually be integrated into the main Publish method
func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) (int64, error) {
// Assign offset for this message
offset, err := assignOffsetFn()
if err != nil {
return 0, fmt.Errorf("failed to assign offset: %w", err)
}
// Add message to buffer with offset
err = p.addToBufferWithOffset(message, offset)
if err != nil {
return 0, fmt.Errorf("failed to add message to buffer: %w", err)
}
// Send to follower if needed (same logic as original Publish)
if p.publishFolloweMeStream != nil {
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
},
}); followErr != nil {
return 0, fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
}
} else {
atomic.StoreInt64(&p.AckTsNs, message.TsNs)
}
return offset, nil
}
// addToBufferWithOffset adds a message to the log buffer with a pre-assigned offset
func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offset int64) error {
// Ensure we have a timestamp
processingTsNs := message.TsNs
if processingTsNs == 0 {
processingTsNs = time.Now().UnixNano()
}
// Build a LogEntry that preserves the assigned sequential offset
logEntry := &filer_pb.LogEntry{
TsNs: processingTsNs,
PartitionKeyHash: util.HashToInt32(message.Key),
Data: message.Value,
Key: message.Key,
Offset: offset,
}
// Add the entry to the buffer in a way that preserves offset on disk and in-memory
p.LogBuffer.AddLogEntryToBuffer(logEntry)
return nil
}
// GetOffsetInfo returns offset information for this partition
// TODO: This should integrate with the broker's offset manager
func (p *LocalPartition) GetOffsetInfo() map[string]interface{} {
return map[string]interface{}{
"partition_ring_size": p.RingSize,
"partition_range_start": p.RangeStart,
"partition_range_stop": p.RangeStop,
"partition_unix_time": p.UnixTimeNs,
"buffer_name": p.LogBuffer.GetName(),
"buffer_batch_index": p.LogBuffer.GetBatchIndex(),
}
}
// OffsetAwarePublisher wraps a LocalPartition with offset assignment capability
type OffsetAwarePublisher struct {
partition *LocalPartition
assignOffsetFn OffsetAssignmentFunc
}
// NewOffsetAwarePublisher creates a new offset-aware publisher
func NewOffsetAwarePublisher(partition *LocalPartition, assignOffsetFn OffsetAssignmentFunc) *OffsetAwarePublisher {
return &OffsetAwarePublisher{
partition: partition,
assignOffsetFn: assignOffsetFn,
}
}
// Publish publishes a message with automatic offset assignment
func (oap *OffsetAwarePublisher) Publish(message *mq_pb.DataMessage) error {
_, err := oap.partition.PublishWithOffset(message, oap.assignOffsetFn)
return err
}
// GetPartition returns the underlying partition
func (oap *OffsetAwarePublisher) GetPartition() *LocalPartition {
return oap.partition
}