|
|
|
@ -117,14 +117,12 @@ func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{ |
|
|
|
|
|
|
|
// Check if already registered
|
|
|
|
if existingChan, exists := logBuffer.subscribers[subscriberID]; exists { |
|
|
|
glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name) |
|
|
|
return existingChan |
|
|
|
} |
|
|
|
|
|
|
|
// Create buffered channel (size 1) so notifications never block
|
|
|
|
notifyChan := make(chan struct{}, 1) |
|
|
|
logBuffer.subscribers[subscriberID] = notifyChan |
|
|
|
glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) |
|
|
|
return notifyChan |
|
|
|
} |
|
|
|
|
|
|
|
@ -136,7 +134,6 @@ func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) { |
|
|
|
if ch, exists := logBuffer.subscribers[subscriberID]; exists { |
|
|
|
close(ch) |
|
|
|
delete(logBuffer.subscribers, subscriberID) |
|
|
|
glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -158,7 +155,6 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { |
|
|
|
// it MUST be in memory (not written to disk yet)
|
|
|
|
lastFlushed := logBuffer.lastFlushedOffset.Load() |
|
|
|
if lastFlushed >= 0 && offset > lastFlushed { |
|
|
|
glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed) |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
@ -168,11 +164,9 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { |
|
|
|
// CRITICAL: Check if buffer actually has data (pos > 0)
|
|
|
|
// After flush, pos=0 but range is still valid - data is on disk, not in memory
|
|
|
|
if logBuffer.pos > 0 { |
|
|
|
glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset) |
|
|
|
return true |
|
|
|
} |
|
|
|
// Buffer is empty (just flushed) - data is on disk
|
|
|
|
glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset) |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
@ -181,17 +175,14 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { |
|
|
|
if offset >= buf.startOffset && offset <= buf.offset { |
|
|
|
// Check if prevBuffer actually has data
|
|
|
|
if buf.size > 0 { |
|
|
|
glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset) |
|
|
|
return true |
|
|
|
} |
|
|
|
// Buffer is empty (flushed) - data is on disk
|
|
|
|
glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset) |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Offset is older than memory buffers - only available on disk
|
|
|
|
glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed) |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
@ -209,12 +200,11 @@ func (logBuffer *LogBuffer) notifySubscribers() { |
|
|
|
select { |
|
|
|
case notifyChan <- struct{}{}: |
|
|
|
// Notification sent successfully
|
|
|
|
glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name) |
|
|
|
default: |
|
|
|
// Channel full - subscriber hasn't consumed previous notification yet
|
|
|
|
// This is OK because one notification is sufficient to wake the subscriber
|
|
|
|
glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID) |
|
|
|
} |
|
|
|
_ = subscriberID |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|