@ -12,9 +12,9 @@ import (
// BrokerOffsetManager manages offset assignment for all partitions in a broker
type BrokerOffsetManager struct {
mu sync . RWMutex
offsetIntegration * offset . SMQOffsetIntegration
partitionManagers map [ string ] * offset . PartitionOffsetManager
mu sync . RWMutex
offsetIntegration * offset . SMQOffsetIntegration
partitionManagers map [ string ] * offset . PartitionOffsetManager
storage offset . OffsetStorage
}
@ -30,11 +30,11 @@ func NewBrokerOffsetManagerWithStorage(storage offset.OffsetStorage) *BrokerOffs
if storage == nil {
storage = offset . NewInMemoryOffsetStorage ( )
}
return & BrokerOffsetManager {
offsetIntegration : offset . NewSMQOffsetIntegration ( storage ) ,
partitionManagers : make ( map [ string ] * offset . PartitionOffsetManager ) ,
storage : storage ,
storage : storage ,
}
}
@ -45,29 +45,29 @@ func NewBrokerOffsetManagerWithSQL(dbPath string) (*BrokerOffsetManager, error)
if err != nil {
return nil , fmt . Errorf ( "failed to create database: %w" , err )
}
// Create SQL storage
sqlStorage , err := offset . NewSQLOffsetStorage ( db )
if err != nil {
db . Close ( )
return nil , fmt . Errorf ( "failed to create SQL storage: %w" , err )
}
return & BrokerOffsetManager {
offsetIntegration : offset . NewSMQOffsetIntegration ( sqlStorage ) ,
partitionManagers : make ( map [ string ] * offset . PartitionOffsetManager ) ,
storage : sqlStorage ,
storage : sqlStorage ,
} , nil
}
// AssignOffset assigns the next offset for a partition
func ( bom * BrokerOffsetManager ) AssignOffset ( t topic . Topic , p topic . Partition ) ( int64 , error ) {
partition := topicPartitionToSchemaPartition ( t , p )
bom . mu . RLock ( )
manager , exists := bom . partitionManagers [ partitionKey ( partition ) ]
bom . mu . RUnlock ( )
if ! exists {
bom . mu . Lock ( )
// Double-check after acquiring write lock
@ -82,18 +82,18 @@ func (bom *BrokerOffsetManager) AssignOffset(t topic.Topic, p topic.Partition) (
}
bom . mu . Unlock ( )
}
return manager . AssignOffset ( ) , nil
}
// AssignBatchOffsets assigns a batch of offsets for a partition
func ( bom * BrokerOffsetManager ) AssignBatchOffsets ( t topic . Topic , p topic . Partition , count int64 ) ( baseOffset , lastOffset int64 , err error ) {
partition := topicPartitionToSchemaPartition ( t , p )
bom . mu . RLock ( )
manager , exists := bom . partitionManagers [ partitionKey ( partition ) ]
bom . mu . RUnlock ( )
if ! exists {
bom . mu . Lock ( )
// Double-check after acquiring write lock
@ -107,7 +107,7 @@ func (bom *BrokerOffsetManager) AssignBatchOffsets(t topic.Topic, p topic.Partit
}
bom . mu . Unlock ( )
}
baseOffset , lastOffset = manager . AssignOffsets ( count )
return baseOffset , lastOffset , nil
}
@ -115,7 +115,18 @@ func (bom *BrokerOffsetManager) AssignBatchOffsets(t topic.Topic, p topic.Partit
// GetHighWaterMark returns the high water mark for a partition
func ( bom * BrokerOffsetManager ) GetHighWaterMark ( t topic . Topic , p topic . Partition ) ( int64 , error ) {
partition := topicPartitionToSchemaPartition ( t , p )
return bom . offsetIntegration . GetHighWaterMark ( partition )
// Use the same partition manager that AssignBatchOffsets updates
bom . mu . RLock ( )
manager , exists := bom . partitionManagers [ partitionKey ( partition ) ]
bom . mu . RUnlock ( )
if ! exists {
// If no manager exists, return 0 (no offsets assigned yet)
return 0 , nil
}
return manager . GetHighWaterMark ( ) , nil
}
// CreateSubscription creates an offset-based subscription
@ -145,7 +156,39 @@ func (bom *BrokerOffsetManager) CloseSubscription(subscriptionID string) error {
// GetPartitionOffsetInfo returns comprehensive offset information for a partition
func ( bom * BrokerOffsetManager ) GetPartitionOffsetInfo ( t topic . Topic , p topic . Partition ) ( * offset . PartitionOffsetInfo , error ) {
partition := topicPartitionToSchemaPartition ( t , p )
return bom . offsetIntegration . GetPartitionOffsetInfo ( partition )
// Use the same partition manager that AssignBatchOffsets updates
bom . mu . RLock ( )
manager , exists := bom . partitionManagers [ partitionKey ( partition ) ]
bom . mu . RUnlock ( )
if ! exists {
// If no manager exists, return info for empty partition
return & offset . PartitionOffsetInfo {
Partition : partition ,
EarliestOffset : 0 ,
LatestOffset : - 1 , // -1 indicates no records yet
HighWaterMark : 0 ,
RecordCount : 0 ,
ActiveSubscriptions : 0 ,
} , nil
}
// Get info from the manager
highWaterMark := manager . GetHighWaterMark ( )
var latestOffset int64 = - 1
if highWaterMark > 0 {
latestOffset = highWaterMark - 1 // Latest assigned offset
}
return & offset . PartitionOffsetInfo {
Partition : partition ,
EarliestOffset : 0 , // For simplicity, assume earliest is always 0
LatestOffset : latestOffset ,
HighWaterMark : highWaterMark ,
RecordCount : highWaterMark ,
ActiveSubscriptions : 0 , // TODO: Track subscription count if needed
} , nil
}
// topicPartitionToSchemaPartition converts topic.Topic and topic.Partition to schema_pb.Partition
@ -160,57 +203,73 @@ func topicPartitionToSchemaPartition(t topic.Topic, p topic.Partition) *schema_p
// partitionKey generates a unique key for a partition (same as offset package)
func partitionKey ( partition * schema_pb . Partition ) string {
return fmt . Sprintf ( "ring:%d:range:%d-%d:time:%d" ,
return fmt . Sprintf ( "ring:%d:range:%d-%d:time:%d" ,
partition . RingSize , partition . RangeStart , partition . RangeStop , partition . UnixTimeNs )
}
// OffsetAssignmentResult contains the result of offset assignment for logging/metrics
type OffsetAssignmentResult struct {
Topic topic . Topic
Partition topic . Partition
BaseOffset int64
LastOffset int64
Count int64
Timestamp int64
Error error
Topic topic . Topic
Partition topic . Partition
BaseOffset int64
LastOffset int64
Count int64
Timestamp int64
Error error
}
// AssignOffsetsWithResult assigns offsets and returns detailed result for logging/metrics
func ( bom * BrokerOffsetManager ) AssignOffsetsWithResult ( t topic . Topic , p topic . Partition , count int64 ) * OffsetAssignmentResult {
baseOffset , lastOffset , err := bom . AssignBatchOffsets ( t , p , count )
result := & OffsetAssignmentResult {
Topic : t ,
Partition : p ,
Count : count ,
Error : err ,
}
if err == nil {
result . BaseOffset = baseOffset
result . LastOffset = lastOffset
result . Timestamp = time . Now ( ) . UnixNano ( )
}
return result
}
// GetOffsetMetrics returns metrics about offset usage across all partitions
func ( bom * BrokerOffsetManager ) GetOffsetMetrics ( ) * offset . OffsetMetrics {
return bom . offsetIntegration . GetOffsetMetrics ( )
bom . mu . RLock ( )
defer bom . mu . RUnlock ( )
// Count active partitions and calculate total offsets
partitionCount := int64 ( len ( bom . partitionManagers ) )
var totalOffsets int64 = 0
for _ , manager := range bom . partitionManagers {
totalOffsets += manager . GetHighWaterMark ( )
}
return & offset . OffsetMetrics {
PartitionCount : partitionCount ,
TotalOffsets : totalOffsets ,
ActiveSubscriptions : 0 , // TODO: Track subscription count if needed
AverageLatency : 0.0 ,
}
}
// Shutdown gracefully shuts down the offset manager
func ( bom * BrokerOffsetManager ) Shutdown ( ) {
bom . mu . Lock ( )
defer bom . mu . Unlock ( )
// Close all partition managers
for key := range bom . partitionManagers {
// Partition managers don't have explicit shutdown, but we clear the map
delete ( bom . partitionManagers , key )
}
bom . partitionManagers = make ( map [ string ] * offset . PartitionOffsetManager )
// TODO: Close storage connections when SQL storage is implemented
}