Browse Source

fix issues

pull/7231/head
chrislu 2 months ago
parent
commit
6eb1da41d4
  1. 8
      weed/mq/broker/broker_grpc_pub.go
  2. 15
      weed/mq/broker/broker_log_buffer_offset.go
  3. 21
      weed/mq/topic/local_partition_offset.go
  4. 54
      weed/util/log_buffer/log_buffer.go

8
weed/mq/broker/broker_grpc_pub.go

@ -166,13 +166,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
} }
// Use offset-aware publishing // Use offset-aware publishing
if err = localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn); err != nil {
assignedOffset, err := localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn)
if err != nil {
return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err) return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err)
} }
// Update published offset and last seen time for this publisher // Update published offset and last seen time for this publisher
// TODO: Update this to use the actual assigned offset instead of timestamp
publisher.UpdatePublishedOffset(dataMessage.TsNs)
// Use the actual assigned offset instead of timestamp
publisher.UpdatePublishedOffset(assignedOffset)
} }
glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
@ -194,4 +195,3 @@ func findClientAddress(ctx context.Context) string {
} }
return pr.Addr.String() return pr.Addr.String()
} }

15
weed/mq/broker/broker_log_buffer_offset.go

@ -1,13 +1,14 @@
package broker package broker
import ( import (
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"time"
) )
// OffsetAssignmentFunc is a function type for assigning offsets to messages // OffsetAssignmentFunc is a function type for assigning offsets to messages
@ -70,15 +71,9 @@ func (b *MessageQueueBroker) addLogEntryToBuffer(
// ASSUMPTION: We're bypassing some of the LogBuffer's internal logic // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic
// This should be properly integrated when LogBuffer is modified // This should be properly integrated when LogBuffer is modified
// For now, we'll use the existing AddToBuffer method and ignore the offset
// The offset will be preserved in the parquet files through our parquet integration
message := &mq_pb.DataMessage{
Key: logEntry.Key,
Value: logEntry.Data,
TsNs: logEntry.TsNs,
}
logBuffer.AddToBuffer(message)
// Use the new AddLogEntryToBuffer method to preserve offset information
// This ensures the offset is maintained throughout the entire data flow
logBuffer.AddLogEntryToBuffer(logEntry)
return nil return nil
} }

21
weed/mq/topic/local_partition_offset.go

@ -14,17 +14,17 @@ type OffsetAssignmentFunc func() (int64, error)
// PublishWithOffset publishes a message with offset assignment // PublishWithOffset publishes a message with offset assignment
// TODO: This extends LocalPartition with offset support // TODO: This extends LocalPartition with offset support
// ASSUMPTION: This will eventually be integrated into the main Publish method // ASSUMPTION: This will eventually be integrated into the main Publish method
func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) error {
func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) (int64, error) {
// Assign offset for this message // Assign offset for this message
offset, err := assignOffsetFn() offset, err := assignOffsetFn()
if err != nil { if err != nil {
return fmt.Errorf("failed to assign offset: %w", err)
return 0, fmt.Errorf("failed to assign offset: %w", err)
} }
// Add message to buffer with offset // Add message to buffer with offset
err = p.addToBufferWithOffset(message, offset) err = p.addToBufferWithOffset(message, offset)
if err != nil { if err != nil {
return fmt.Errorf("failed to add message to buffer: %w", err)
return 0, fmt.Errorf("failed to add message to buffer: %w", err)
} }
// Send to follower if needed (same logic as original Publish) // Send to follower if needed (same logic as original Publish)
@ -34,13 +34,13 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff
Data: message, Data: message,
}, },
}); followErr != nil { }); followErr != nil {
return fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
return 0, fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
} }
} else { } else {
atomic.StoreInt64(&p.AckTsNs, message.TsNs) atomic.StoreInt64(&p.AckTsNs, message.TsNs)
} }
return nil
return offset, nil
} }
// addToBufferWithOffset adds a message to the log buffer with a pre-assigned offset // addToBufferWithOffset adds a message to the log buffer with a pre-assigned offset
@ -80,15 +80,15 @@ func (p *LocalPartition) GetOffsetInfo() map[string]interface{} {
"partition_range_start": p.RangeStart, "partition_range_start": p.RangeStart,
"partition_range_stop": p.RangeStop, "partition_range_stop": p.RangeStop,
"partition_unix_time": p.UnixTimeNs, "partition_unix_time": p.UnixTimeNs,
"buffer_name": p.LogBuffer.GetName(),
"buffer_batch_index": p.LogBuffer.GetBatchIndex(),
"buffer_name": p.LogBuffer.GetName(),
"buffer_batch_index": p.LogBuffer.GetBatchIndex(),
} }
} }
// OffsetAwarePublisher wraps a LocalPartition with offset assignment capability // OffsetAwarePublisher wraps a LocalPartition with offset assignment capability
type OffsetAwarePublisher struct { type OffsetAwarePublisher struct {
partition *LocalPartition
assignOffsetFn OffsetAssignmentFunc
partition *LocalPartition
assignOffsetFn OffsetAssignmentFunc
} }
// NewOffsetAwarePublisher creates a new offset-aware publisher // NewOffsetAwarePublisher creates a new offset-aware publisher
@ -101,7 +101,8 @@ func NewOffsetAwarePublisher(partition *LocalPartition, assignOffsetFn OffsetAss
// Publish publishes a message with automatic offset assignment // Publish publishes a message with automatic offset assignment
func (oap *OffsetAwarePublisher) Publish(message *mq_pb.DataMessage) error { func (oap *OffsetAwarePublisher) Publish(message *mq_pb.DataMessage) error {
return oap.partition.PublishWithOffset(message, oap.assignOffsetFn)
_, err := oap.partition.PublishWithOffset(message, oap.assignOffsetFn)
return err
} }
// GetPartition returns the underlying partition // GetPartition returns the underlying partition

54
weed/util/log_buffer/log_buffer.go

@ -75,6 +75,60 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
} }
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
logEntryData, _ := proto.Marshal(logEntry)
var toFlush *dataToFlush
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
}()
processingTsNs := logEntry.TsNs
ts := time.Unix(0, processingTsNs)
// Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
// Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
logEntryData, _ = proto.Marshal(logEntry)
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
logBuffer.buf = make([]byte, 2*size+4)
}
}
logBuffer.stopTime = ts
logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.batchIndex++
}
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock

Loading…
Cancel
Save