|
|
@ -58,7 +58,7 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta |
|
|
|
// Send init message to start subscription with Kafka client's consumer group and ID
|
|
|
|
initReq := createSubscribeInitMessage(topic, actualPartition, startOffset, offsetType, consumerGroup, consumerID) |
|
|
|
|
|
|
|
glog.V(0).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
glog.V(2).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
topic, partition, startOffset, offsetType, consumerGroup, consumerID) |
|
|
|
|
|
|
|
if err := stream.Send(initReq); err != nil { |
|
|
@ -120,7 +120,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
if startOffset >= currentOffset || canUseCache { |
|
|
|
// Can read forward OR offset is in cache - reuse session
|
|
|
|
bc.subscribersLock.RUnlock() |
|
|
|
glog.V(0).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", |
|
|
|
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", |
|
|
|
key, currentOffset, startOffset) |
|
|
|
return session, nil |
|
|
|
} |
|
|
@ -128,7 +128,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
// Backward seek, not in cache
|
|
|
|
// Let ReadRecordsFromOffset handle the recreation decision based on the actual read context
|
|
|
|
bc.subscribersLock.RUnlock() |
|
|
|
glog.V(0).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", |
|
|
|
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", |
|
|
|
key, currentOffset, startOffset) |
|
|
|
return session, nil |
|
|
|
} |
|
|
@ -156,7 +156,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
} |
|
|
|
|
|
|
|
// Session is at wrong offset - must recreate
|
|
|
|
glog.V(0).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) |
|
|
|
glog.V(2).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) |
|
|
|
if session.Stream != nil { |
|
|
|
_ = session.Stream.CloseSend() |
|
|
|
} |
|
|
@ -192,19 +192,19 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
// Kafka offset -1 typically means "latest"
|
|
|
|
offsetType = schema_pb.OffsetType_RESET_TO_LATEST |
|
|
|
offsetValue = 0 // Not used with RESET_TO_LATEST
|
|
|
|
glog.V(0).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") |
|
|
|
glog.V(2).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") |
|
|
|
} else { |
|
|
|
// CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset
|
|
|
|
// This allows the subscriber to read from both buffer and disk at the correct position
|
|
|
|
offsetType = schema_pb.OffsetType_EXACT_OFFSET |
|
|
|
offsetValue = startOffset // Use the exact Kafka offset
|
|
|
|
glog.V(0).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) |
|
|
|
glog.V(2).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s", |
|
|
|
glog.V(2).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s", |
|
|
|
topic, partition, startOffset, offsetType) |
|
|
|
|
|
|
|
glog.V(0).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
glog.V(2).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
topic, partition, offsetValue, offsetType, consumerGroup, consumerID) |
|
|
|
|
|
|
|
// Send init message using the actual partition structure that the broker allocated
|
|
|
@ -225,7 +225,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta |
|
|
|
} |
|
|
|
|
|
|
|
bc.subscribers[key] = session |
|
|
|
glog.V(0).Infof("Created subscriber session for %s with context cancellation support", key) |
|
|
|
glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key) |
|
|
|
return session, nil |
|
|
|
} |
|
|
|
|
|
|
@ -239,7 +239,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
|
|
|
|
session.mu.Lock() |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d", |
|
|
|
glog.V(2).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d", |
|
|
|
session.Topic, session.Partition, requestedOffset, session.StartOffset, maxRecords) |
|
|
|
|
|
|
|
// Check cache first
|
|
|
@ -252,7 +252,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
startIdx := int(requestedOffset - cacheStartOffset) |
|
|
|
// CRITICAL: Bounds check to prevent race condition where cache is modified between checks
|
|
|
|
if startIdx < 0 || startIdx >= len(session.consumedRecords) { |
|
|
|
glog.V(0).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read", |
|
|
|
glog.V(2).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read", |
|
|
|
startIdx, len(session.consumedRecords)) |
|
|
|
// Cache was modified, fall through to normal read path
|
|
|
|
} else { |
|
|
@ -260,13 +260,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
if endIdx > len(session.consumedRecords) { |
|
|
|
endIdx = len(session.consumedRecords) |
|
|
|
} |
|
|
|
glog.V(0).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", |
|
|
|
glog.V(2).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", |
|
|
|
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) |
|
|
|
session.mu.Unlock() |
|
|
|
return session.consumedRecords[startIdx:endIdx], nil |
|
|
|
} |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", |
|
|
|
glog.V(2).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", |
|
|
|
session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) |
|
|
|
} |
|
|
|
} |
|
|
@ -291,7 +291,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// Need to seek backward - close old session and create a fresh subscriber
|
|
|
|
// Restarting an existing stream doesn't work reliably because the broker may still
|
|
|
|
// have old data buffered in the stream pipeline
|
|
|
|
glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", |
|
|
|
glog.V(2).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", |
|
|
|
requestedOffset, currentStartOffset) |
|
|
|
|
|
|
|
// Extract session details (note: session.mu was already unlocked at line 294)
|
|
|
@ -303,9 +303,9 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
|
|
|
|
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
|
|
|
|
// This prevents multiple threads from all deciding to recreate based on stale data
|
|
|
|
glog.V(0).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) |
|
|
|
bc.subscribersLock.Lock() |
|
|
|
glog.V(0).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset) |
|
|
|
|
|
|
|
// Double-check if another thread already recreated the session at the desired offset
|
|
|
|
// This prevents multiple concurrent threads from all trying to recreate the same session
|
|
|
@ -317,12 +317,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// Check if the session was already recreated at (or before) the requested offset
|
|
|
|
if existingOffset <= requestedOffset { |
|
|
|
bc.subscribersLock.Unlock() |
|
|
|
glog.V(0).Infof("[FETCH] ✓ Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] ✓ Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) |
|
|
|
// Re-acquire the existing session and continue
|
|
|
|
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) |
|
|
|
|
|
|
|
// Session still needs recreation - close it
|
|
|
|
if existingSession.Stream != nil { |
|
|
@ -332,7 +332,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
existingSession.Cancel() |
|
|
|
} |
|
|
|
delete(bc.subscribers, key) |
|
|
|
glog.V(0).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) |
|
|
|
} |
|
|
|
// CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session
|
|
|
|
// to prevent other threads from interfering. We'll create the session inline.
|
|
|
@ -361,10 +361,10 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// Use EXACT_OFFSET to position subscriber at the exact Kafka offset
|
|
|
|
offsetType := schema_pb.OffsetType_EXACT_OFFSET |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", |
|
|
|
glog.V(2).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", |
|
|
|
topic, partition, requestedOffset) |
|
|
|
|
|
|
|
glog.V(0).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
glog.V(2).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", |
|
|
|
topic, partition, requestedOffset, offsetType, consumerGroup, consumerID) |
|
|
|
|
|
|
|
// Send init message using the actual partition structure
|
|
|
@ -388,10 +388,10 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
|
|
|
|
bc.subscribers[key] = newSession |
|
|
|
bc.subscribersLock.Unlock() |
|
|
|
glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) |
|
|
|
glog.V(2).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) |
|
|
|
|
|
|
|
// Read from fresh subscriber
|
|
|
|
glog.V(0).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) |
|
|
|
glog.V(2).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) |
|
|
|
return bc.ReadRecords(ctx, newSession, maxRecords) |
|
|
|
} |
|
|
|
|
|
|
@ -399,7 +399,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok |
|
|
|
// This handles:
|
|
|
|
// - Exact match (requestedOffset == session.StartOffset)
|
|
|
|
// - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache)
|
|
|
|
glog.V(0).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", |
|
|
|
glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", |
|
|
|
requestedOffset, currentStartOffset) |
|
|
|
// Note: session.mu was already unlocked at line 294 after reading currentStartOffset
|
|
|
|
return bc.ReadRecords(ctx, session, maxRecords) |
|
|
@ -423,7 +423,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
session.mu.Lock() |
|
|
|
defer session.mu.Unlock() |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", |
|
|
|
glog.V(2).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", |
|
|
|
session.Topic, session.Partition, session.StartOffset, maxRecords) |
|
|
|
|
|
|
|
var records []*SeaweedRecord |
|
|
@ -442,7 +442,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
|
|
|
|
if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset { |
|
|
|
// Records are in cache
|
|
|
|
glog.V(0).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", |
|
|
|
glog.V(2).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", |
|
|
|
currentOffset, cacheStartOffset, cacheEndOffset) |
|
|
|
|
|
|
|
// Find starting index in cache
|
|
|
@ -458,7 +458,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
endIdx = len(session.consumedRecords) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) |
|
|
|
glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) |
|
|
|
return session.consumedRecords[startIdx:endIdx], nil |
|
|
|
} |
|
|
|
} |
|
|
@ -511,7 +511,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
select { |
|
|
|
case result := <-recvChan: |
|
|
|
if result.err != nil { |
|
|
|
glog.V(0).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) |
|
|
|
glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) |
|
|
|
return records, nil // Return empty - no error for empty topic
|
|
|
|
} |
|
|
|
|
|
|
@ -524,13 +524,13 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
} |
|
|
|
records = append(records, record) |
|
|
|
currentOffset++ |
|
|
|
glog.V(0).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", |
|
|
|
glog.V(2).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", |
|
|
|
record.Offset, len(record.Key), len(record.Value)) |
|
|
|
} |
|
|
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
// Timeout on first record - topic is empty or no data available
|
|
|
|
glog.V(0).Infof("[FETCH] No data available (timeout on first record)") |
|
|
|
glog.V(2).Infof("[FETCH] No data available (timeout on first record)") |
|
|
|
return records, nil |
|
|
|
} |
|
|
|
|
|
|
@ -584,9 +584,9 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
readDuration := time.Since(readStart) |
|
|
|
|
|
|
|
if result.err != nil { |
|
|
|
glog.V(0).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) |
|
|
|
glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) |
|
|
|
// Update session offset before returning
|
|
|
|
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)", |
|
|
|
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)", |
|
|
|
session.Key(), session.StartOffset, currentOffset, len(records)) |
|
|
|
session.StartOffset = currentOffset |
|
|
|
return records, nil |
|
|
@ -603,25 +603,25 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
currentOffset++ |
|
|
|
consecutiveReads++ // Track number of successful reads for adaptive timeout
|
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", |
|
|
|
glog.V(2).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", |
|
|
|
len(records), record.Offset, len(record.Key), len(record.Value), readDuration) |
|
|
|
} |
|
|
|
|
|
|
|
case <-ctx2.Done(): |
|
|
|
cancel2() |
|
|
|
// Timeout - return what we have
|
|
|
|
glog.V(0).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) |
|
|
|
glog.V(2).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) |
|
|
|
// CRITICAL: Update session offset so next fetch knows where we left off
|
|
|
|
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)", |
|
|
|
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)", |
|
|
|
session.Key(), session.StartOffset, currentOffset, len(records)) |
|
|
|
session.StartOffset = currentOffset |
|
|
|
return records, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) |
|
|
|
glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) |
|
|
|
// Update session offset after successful read
|
|
|
|
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)", |
|
|
|
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)", |
|
|
|
session.Key(), session.StartOffset, currentOffset, len(records)) |
|
|
|
session.StartOffset = currentOffset |
|
|
|
|
|
|
@ -632,7 +632,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib |
|
|
|
// Keep only the most recent 1000 records
|
|
|
|
session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-1000:] |
|
|
|
} |
|
|
|
glog.V(0).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) |
|
|
|
glog.V(2).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) |
|
|
|
|
|
|
|
return records, nil |
|
|
|
} |
|
|
@ -658,7 +658,7 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG |
|
|
|
session.Cancel() |
|
|
|
} |
|
|
|
delete(bc.subscribers, key) |
|
|
|
glog.V(0).Infof("[FETCH] Closed subscriber for %s", key) |
|
|
|
glog.V(2).Infof("[FETCH] Closed subscriber for %s", key) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -705,7 +705,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO |
|
|
|
session.mu.Lock() |
|
|
|
defer session.mu.Unlock() |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", |
|
|
|
glog.V(2).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", |
|
|
|
session.Topic, session.Partition, session.StartOffset, newOffset) |
|
|
|
|
|
|
|
// Close existing stream
|
|
|
@ -752,7 +752,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO |
|
|
|
session.Ctx = subscriberCtx |
|
|
|
session.StartOffset = newOffset |
|
|
|
|
|
|
|
glog.V(0).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", |
|
|
|
glog.V(2).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", |
|
|
|
session.Topic, session.Partition, newOffset) |
|
|
|
|
|
|
|
return nil |
|
|
|