From dbac0c45d8aceec20371394c183af16782de4dad Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 22:04:05 -0700 Subject: [PATCH] Update broker_offset_manager.go Fixed GetHighWaterMark() to use correct partition managers Fixed GetPartitionOffsetInfo() with proper struct fields Fixed GetOffsetMetrics() with correct types and system --- weed/mq/broker/broker_offset_manager.go | 119 ++++++++++++++++++------ 1 file changed, 89 insertions(+), 30 deletions(-) diff --git a/weed/mq/broker/broker_offset_manager.go b/weed/mq/broker/broker_offset_manager.go index 50c337cff..4d0d49bbd 100644 --- a/weed/mq/broker/broker_offset_manager.go +++ b/weed/mq/broker/broker_offset_manager.go @@ -12,9 +12,9 @@ import ( // BrokerOffsetManager manages offset assignment for all partitions in a broker type BrokerOffsetManager struct { - mu sync.RWMutex - offsetIntegration *offset.SMQOffsetIntegration - partitionManagers map[string]*offset.PartitionOffsetManager + mu sync.RWMutex + offsetIntegration *offset.SMQOffsetIntegration + partitionManagers map[string]*offset.PartitionOffsetManager storage offset.OffsetStorage } @@ -30,11 +30,11 @@ func NewBrokerOffsetManagerWithStorage(storage offset.OffsetStorage) *BrokerOffs if storage == nil { storage = offset.NewInMemoryOffsetStorage() } - + return &BrokerOffsetManager{ offsetIntegration: offset.NewSMQOffsetIntegration(storage), partitionManagers: make(map[string]*offset.PartitionOffsetManager), - storage: storage, + storage: storage, } } @@ -45,29 +45,29 @@ func NewBrokerOffsetManagerWithSQL(dbPath string) (*BrokerOffsetManager, error) if err != nil { return nil, fmt.Errorf("failed to create database: %w", err) } - + // Create SQL storage sqlStorage, err := offset.NewSQLOffsetStorage(db) if err != nil { db.Close() return nil, fmt.Errorf("failed to create SQL storage: %w", err) } - + return &BrokerOffsetManager{ offsetIntegration: offset.NewSMQOffsetIntegration(sqlStorage), partitionManagers: make(map[string]*offset.PartitionOffsetManager), - storage: sqlStorage, + storage: sqlStorage, }, nil } // AssignOffset assigns the next offset for a partition func (bom *BrokerOffsetManager) AssignOffset(t topic.Topic, p topic.Partition) (int64, error) { partition := topicPartitionToSchemaPartition(t, p) - + bom.mu.RLock() manager, exists := bom.partitionManagers[partitionKey(partition)] bom.mu.RUnlock() - + if !exists { bom.mu.Lock() // Double-check after acquiring write lock @@ -82,18 +82,18 @@ func (bom *BrokerOffsetManager) AssignOffset(t topic.Topic, p topic.Partition) ( } bom.mu.Unlock() } - + return manager.AssignOffset(), nil } // AssignBatchOffsets assigns a batch of offsets for a partition func (bom *BrokerOffsetManager) AssignBatchOffsets(t topic.Topic, p topic.Partition, count int64) (baseOffset, lastOffset int64, err error) { partition := topicPartitionToSchemaPartition(t, p) - + bom.mu.RLock() manager, exists := bom.partitionManagers[partitionKey(partition)] bom.mu.RUnlock() - + if !exists { bom.mu.Lock() // Double-check after acquiring write lock @@ -107,7 +107,7 @@ func (bom *BrokerOffsetManager) AssignBatchOffsets(t topic.Topic, p topic.Partit } bom.mu.Unlock() } - + baseOffset, lastOffset = manager.AssignOffsets(count) return baseOffset, lastOffset, nil } @@ -115,7 +115,18 @@ func (bom *BrokerOffsetManager) AssignBatchOffsets(t topic.Topic, p topic.Partit // GetHighWaterMark returns the high water mark for a partition func (bom *BrokerOffsetManager) GetHighWaterMark(t topic.Topic, p topic.Partition) (int64, error) { partition := topicPartitionToSchemaPartition(t, p) - return bom.offsetIntegration.GetHighWaterMark(partition) + + // Use the same partition manager that AssignBatchOffsets updates + bom.mu.RLock() + manager, exists := bom.partitionManagers[partitionKey(partition)] + bom.mu.RUnlock() + + if !exists { + // If no manager exists, return 0 (no offsets assigned yet) + return 0, nil + } + + return manager.GetHighWaterMark(), nil } // CreateSubscription creates an offset-based subscription @@ -145,7 +156,39 @@ func (bom *BrokerOffsetManager) CloseSubscription(subscriptionID string) error { // GetPartitionOffsetInfo returns comprehensive offset information for a partition func (bom *BrokerOffsetManager) GetPartitionOffsetInfo(t topic.Topic, p topic.Partition) (*offset.PartitionOffsetInfo, error) { partition := topicPartitionToSchemaPartition(t, p) - return bom.offsetIntegration.GetPartitionOffsetInfo(partition) + + // Use the same partition manager that AssignBatchOffsets updates + bom.mu.RLock() + manager, exists := bom.partitionManagers[partitionKey(partition)] + bom.mu.RUnlock() + + if !exists { + // If no manager exists, return info for empty partition + return &offset.PartitionOffsetInfo{ + Partition: partition, + EarliestOffset: 0, + LatestOffset: -1, // -1 indicates no records yet + HighWaterMark: 0, + RecordCount: 0, + ActiveSubscriptions: 0, + }, nil + } + + // Get info from the manager + highWaterMark := manager.GetHighWaterMark() + var latestOffset int64 = -1 + if highWaterMark > 0 { + latestOffset = highWaterMark - 1 // Latest assigned offset + } + + return &offset.PartitionOffsetInfo{ + Partition: partition, + EarliestOffset: 0, // For simplicity, assume earliest is always 0 + LatestOffset: latestOffset, + HighWaterMark: highWaterMark, + RecordCount: highWaterMark, + ActiveSubscriptions: 0, // TODO: Track subscription count if needed + }, nil } // topicPartitionToSchemaPartition converts topic.Topic and topic.Partition to schema_pb.Partition @@ -160,57 +203,73 @@ func topicPartitionToSchemaPartition(t topic.Topic, p topic.Partition) *schema_p // partitionKey generates a unique key for a partition (same as offset package) func partitionKey(partition *schema_pb.Partition) string { - return fmt.Sprintf("ring:%d:range:%d-%d:time:%d", + return fmt.Sprintf("ring:%d:range:%d-%d:time:%d", partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs) } // OffsetAssignmentResult contains the result of offset assignment for logging/metrics type OffsetAssignmentResult struct { - Topic topic.Topic - Partition topic.Partition - BaseOffset int64 - LastOffset int64 - Count int64 - Timestamp int64 - Error error + Topic topic.Topic + Partition topic.Partition + BaseOffset int64 + LastOffset int64 + Count int64 + Timestamp int64 + Error error } // AssignOffsetsWithResult assigns offsets and returns detailed result for logging/metrics func (bom *BrokerOffsetManager) AssignOffsetsWithResult(t topic.Topic, p topic.Partition, count int64) *OffsetAssignmentResult { baseOffset, lastOffset, err := bom.AssignBatchOffsets(t, p, count) - + result := &OffsetAssignmentResult{ Topic: t, Partition: p, Count: count, Error: err, } - + if err == nil { result.BaseOffset = baseOffset result.LastOffset = lastOffset result.Timestamp = time.Now().UnixNano() } - + return result } // GetOffsetMetrics returns metrics about offset usage across all partitions func (bom *BrokerOffsetManager) GetOffsetMetrics() *offset.OffsetMetrics { - return bom.offsetIntegration.GetOffsetMetrics() + bom.mu.RLock() + defer bom.mu.RUnlock() + + // Count active partitions and calculate total offsets + partitionCount := int64(len(bom.partitionManagers)) + var totalOffsets int64 = 0 + + for _, manager := range bom.partitionManagers { + totalOffsets += manager.GetHighWaterMark() + } + + return &offset.OffsetMetrics{ + PartitionCount: partitionCount, + TotalOffsets: totalOffsets, + ActiveSubscriptions: 0, // TODO: Track subscription count if needed + AverageLatency: 0.0, + } } // Shutdown gracefully shuts down the offset manager func (bom *BrokerOffsetManager) Shutdown() { bom.mu.Lock() defer bom.mu.Unlock() - + // Close all partition managers for key := range bom.partitionManagers { // Partition managers don't have explicit shutdown, but we clear the map delete(bom.partitionManagers, key) } bom.partitionManagers = make(map[string]*offset.PartitionOffsetManager) - + // TODO: Close storage connections when SQL storage is implemented }