From 6eb1da41d495db29c5c45a40dc352fe6f3fa2d99 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 18:44:29 -0700 Subject: [PATCH] fix issues --- weed/mq/broker/broker_grpc_pub.go | 12 ++--- weed/mq/broker/broker_log_buffer_offset.go | 27 +++++------ weed/mq/topic/local_partition_offset.go | 31 +++++++------ weed/util/log_buffer/log_buffer.go | 54 ++++++++++++++++++++++ 4 files changed, 87 insertions(+), 37 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index c5860c0ce..b426ccd5e 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -159,20 +159,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // ASSUMPTION: For now using existing Publish method, offset assignment will be added in Phase 4 completion // send to the local partition with offset assignment t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - + // Create offset assignment function for this partition assignOffsetFn := func() (int64, error) { return b.offsetManager.AssignOffset(t, p) } - + // Use offset-aware publishing - if err = localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn); err != nil { + assignedOffset, err := localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn) + if err != nil { return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err) } // Update published offset and last seen time for this publisher - // TODO: Update this to use the actual assigned offset instead of timestamp - publisher.UpdatePublishedOffset(dataMessage.TsNs) + // Use the actual assigned offset instead of timestamp + publisher.UpdatePublishedOffset(assignedOffset) } glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) @@ -194,4 +195,3 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } - diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go index 956754e70..c30b70d65 100644 --- a/weed/mq/broker/broker_log_buffer_offset.go +++ b/weed/mq/broker/broker_log_buffer_offset.go @@ -1,13 +1,14 @@ package broker import ( + "time" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "time" ) // OffsetAssignmentFunc is a function type for assigning offsets to messages @@ -27,7 +28,7 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( if err != nil { return err } - + // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock var ts time.Time processingTsNs := message.TsNs @@ -69,16 +70,10 @@ func (b *MessageQueueBroker) addLogEntryToBuffer( // TODO: This is a simplified version of LogBuffer.AddDataToBuffer // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic // This should be properly integrated when LogBuffer is modified - - // For now, we'll use the existing AddToBuffer method and ignore the offset - // The offset will be preserved in the parquet files through our parquet integration - message := &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - } - - logBuffer.AddToBuffer(message) + + // Use the new AddLogEntryToBuffer method to preserve offset information + // This ensures the offset is maintained throughout the entire data flow + logBuffer.AddLogEntryToBuffer(logEntry) return nil } @@ -88,7 +83,7 @@ func (b *MessageQueueBroker) GetPartitionOffsetInfo(t topic.Topic, p topic.Parti if err != nil { return nil, err } - + // Convert to broker-specific format if needed return &PartitionOffsetInfo{ Topic: t, @@ -123,7 +118,7 @@ func (b *MessageQueueBroker) CreateOffsetSubscription( // TODO: Convert string offsetType to schema_pb.OffsetType // ASSUMPTION: For now using RESET_TO_EARLIEST as default // This should be properly mapped based on the offsetType parameter - + _, err := b.offsetManager.CreateSubscription( subscriptionID, t, @@ -131,14 +126,14 @@ func (b *MessageQueueBroker) CreateOffsetSubscription( 0, // schema_pb.OffsetType_RESET_TO_EARLIEST startOffset, ) - + return err } // GetOffsetMetrics returns offset metrics for monitoring func (b *MessageQueueBroker) GetOffsetMetrics() map[string]interface{} { metrics := b.offsetManager.GetOffsetMetrics() - + return map[string]interface{}{ "partition_count": metrics.PartitionCount, "total_offsets": metrics.TotalOffsets, diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go index 989bd68b7..cb8069932 100644 --- a/weed/mq/topic/local_partition_offset.go +++ b/weed/mq/topic/local_partition_offset.go @@ -14,17 +14,17 @@ type OffsetAssignmentFunc func() (int64, error) // PublishWithOffset publishes a message with offset assignment // TODO: This extends LocalPartition with offset support // ASSUMPTION: This will eventually be integrated into the main Publish method -func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) error { +func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) (int64, error) { // Assign offset for this message offset, err := assignOffsetFn() if err != nil { - return fmt.Errorf("failed to assign offset: %w", err) + return 0, fmt.Errorf("failed to assign offset: %w", err) } - + // Add message to buffer with offset err = p.addToBufferWithOffset(message, offset) if err != nil { - return fmt.Errorf("failed to add message to buffer: %w", err) + return 0, fmt.Errorf("failed to add message to buffer: %w", err) } // Send to follower if needed (same logic as original Publish) @@ -34,20 +34,20 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff Data: message, }, }); followErr != nil { - return fmt.Errorf("send to follower %s: %v", p.Follower, followErr) + return 0, fmt.Errorf("send to follower %s: %v", p.Follower, followErr) } } else { atomic.StoreInt64(&p.AckTsNs, message.TsNs) } - return nil + return offset, nil } // addToBufferWithOffset adds a message to the log buffer with a pre-assigned offset func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offset int64) error { // TODO: This is a workaround until LogBuffer can be modified to handle offsets natively // ASSUMPTION: We create the LogEntry here and then add it to the buffer - + // Prepare timestamp processingTsNs := message.TsNs if processingTsNs == 0 { @@ -57,7 +57,7 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse // TODO: Create LogEntry with assigned offset - for now just using existing buffer // ASSUMPTION: The offset will be preserved through parquet storage integration // Future: LogEntry should be created here with the assigned offset - + // For now, we still use the existing LogBuffer.AddToBuffer // The offset information will be preserved in parquet files // TODO: Modify LogBuffer to accept and preserve offset information @@ -66,9 +66,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse Value: message.Value, TsNs: processingTsNs, } - + p.LogBuffer.AddToBuffer(messageWithTimestamp) - + return nil } @@ -80,15 +80,15 @@ func (p *LocalPartition) GetOffsetInfo() map[string]interface{} { "partition_range_start": p.RangeStart, "partition_range_stop": p.RangeStop, "partition_unix_time": p.UnixTimeNs, - "buffer_name": p.LogBuffer.GetName(), - "buffer_batch_index": p.LogBuffer.GetBatchIndex(), + "buffer_name": p.LogBuffer.GetName(), + "buffer_batch_index": p.LogBuffer.GetBatchIndex(), } } // OffsetAwarePublisher wraps a LocalPartition with offset assignment capability type OffsetAwarePublisher struct { - partition *LocalPartition - assignOffsetFn OffsetAssignmentFunc + partition *LocalPartition + assignOffsetFn OffsetAssignmentFunc } // NewOffsetAwarePublisher creates a new offset-aware publisher @@ -101,7 +101,8 @@ func NewOffsetAwarePublisher(partition *LocalPartition, assignOffsetFn OffsetAss // Publish publishes a message with automatic offset assignment func (oap *OffsetAwarePublisher) Publish(message *mq_pb.DataMessage) error { - return oap.partition.PublishWithOffset(message, oap.assignOffsetFn) + _, err := oap.partition.PublishWithOffset(message, oap.assignOffsetFn) + return err } // GetPartition returns the underlying partition diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 15ea062c6..f5e490a79 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -75,6 +75,60 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) } +// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information +func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { + logEntryData, _ := proto.Marshal(logEntry) + + var toFlush *dataToFlush + logBuffer.Lock() + defer func() { + logBuffer.Unlock() + if toFlush != nil { + logBuffer.flushChan <- toFlush + } + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + }() + + processingTsNs := logEntry.TsNs + ts := time.Unix(0, processingTsNs) + + // Handle timestamp collision inside lock (rare case) + if logBuffer.LastTsNs.Load() >= processingTsNs { + processingTsNs = logBuffer.LastTsNs.Add(1) + ts = time.Unix(0, processingTsNs) + // Re-marshal with corrected timestamp + logEntry.TsNs = processingTsNs + logEntryData, _ = proto.Marshal(logEntry) + } else { + logBuffer.LastTsNs.Store(processingTsNs) + } + + size := len(logEntryData) + + if logBuffer.pos == 0 { + logBuffer.startTime = ts + } + + if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { + toFlush = logBuffer.copyToFlush() + logBuffer.startTime = ts + if len(logBuffer.buf) < size+4 { + logBuffer.buf = make([]byte, 2*size+4) + } + } + logBuffer.stopTime = ts + + logBuffer.idx = append(logBuffer.idx, logBuffer.pos) + util.Uint32toBytes(logBuffer.sizeBuf, uint32(size)) + copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf) + copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) + logBuffer.pos += size + 4 + + logBuffer.batchIndex++ +} + func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock