Browse Source

debug

pull/7329/head
chrislu 7 days ago
parent
commit
1f128d65c5
  1. 63
      weed/mq/kafka/integration/broker_client_subscribe.go

63
weed/mq/kafka/integration/broker_client_subscribe.go

@ -60,6 +60,9 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta
},
}
glog.V(0).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
@ -119,7 +122,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
if startOffset >= currentOffset || canUseCache {
// Can read forward OR offset is in cache - reuse session
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)",
glog.V(0).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)",
key, currentOffset, startOffset)
return session, nil
}
@ -127,7 +130,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
// Backward seek, not in cache
// Let ReadRecordsFromOffset handle the recreation decision based on the actual read context
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)",
glog.V(0).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)",
key, currentOffset, startOffset)
return session, nil
}
@ -193,19 +196,22 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
offsetType = schema_pb.OffsetType_RESET_TO_LATEST
startTimestamp = 0 // Not used with RESET_TO_LATEST
startOffsetValue = 0 // Not used with RESET_TO_LATEST
glog.V(1).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)")
glog.V(0).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)")
} else {
// CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset
// This allows the subscriber to read from both buffer and disk at the correct position
offsetType = schema_pb.OffsetType_EXACT_OFFSET
startTimestamp = 0 // Not used with EXACT_OFFSET
startOffsetValue = startOffset // Use the exact Kafka offset
glog.V(1).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset)
glog.V(0).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset)
}
glog.V(1).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)",
glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)",
topic, partition, startOffset, offsetType, startTimestamp)
glog.V(0).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
// Send init message using the actual partition structure that the broker allocated
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
@ -242,7 +248,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
}
bc.subscribers[key] = session
glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key)
glog.V(0).Infof("Created subscriber session for %s with context cancellation support", key)
return session, nil
}
@ -256,7 +262,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
session.mu.Lock()
glog.V(2).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d",
glog.V(0).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d",
session.Topic, session.Partition, requestedOffset, session.StartOffset, maxRecords)
// Check cache first
@ -269,7 +275,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
startIdx := int(requestedOffset - cacheStartOffset)
// CRITICAL: Bounds check to prevent race condition where cache is modified between checks
if startIdx < 0 || startIdx >= len(session.consumedRecords) {
glog.V(2).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read",
glog.V(0).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read",
startIdx, len(session.consumedRecords))
// Cache was modified, fall through to normal read path
} else {
@ -277,13 +283,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
if endIdx > len(session.consumedRecords) {
endIdx = len(session.consumedRecords)
}
glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)",
glog.V(0).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)",
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
session.mu.Unlock()
return session.consumedRecords[startIdx:endIdx], nil
}
} else {
glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
glog.V(0).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
}
}
@ -380,9 +386,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
startTimestamp := int64(0)
startOffsetValue := requestedOffset
glog.V(1).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d",
glog.V(0).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d",
topic, partition, requestedOffset)
glog.V(0).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
// Send init message using the actual partition structure
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
@ -425,7 +434,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Read from fresh subscriber
glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
glog.V(0).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
return bc.ReadRecords(ctx, newSession, maxRecords)
}
@ -433,7 +442,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
// This handles:
// - Exact match (requestedOffset == session.StartOffset)
// - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache)
glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)",
glog.V(0).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)",
requestedOffset, currentStartOffset)
// Note: session.mu was already unlocked at line 294 after reading currentStartOffset
return bc.ReadRecords(ctx, session, maxRecords)
@ -457,7 +466,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
session.mu.Lock()
defer session.mu.Unlock()
glog.V(2).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d",
glog.V(0).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d",
session.Topic, session.Partition, session.StartOffset, maxRecords)
var records []*SeaweedRecord
@ -476,7 +485,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset {
// Records are in cache
glog.V(2).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]",
glog.V(0).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]",
currentOffset, cacheStartOffset, cacheEndOffset)
// Find starting index in cache
@ -492,7 +501,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
endIdx = len(session.consumedRecords)
}
glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1)
glog.V(0).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1)
return session.consumedRecords[startIdx:endIdx], nil
}
}
@ -545,7 +554,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
select {
case result := <-recvChan:
if result.err != nil {
glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err)
glog.V(0).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err)
return records, nil // Return empty - no error for empty topic
}
@ -558,13 +567,13 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
}
records = append(records, record)
currentOffset++
glog.V(4).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
glog.V(0).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
record.Offset, len(record.Key), len(record.Value))
}
case <-ctx.Done():
// Timeout on first record - topic is empty or no data available
glog.V(4).Infof("[FETCH] No data available (timeout on first record)")
glog.V(0).Infof("[FETCH] No data available (timeout on first record)")
return records, nil
}
@ -616,7 +625,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
readDuration := time.Since(readStart)
if result.err != nil {
glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
glog.V(0).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
// Update session offset before returning
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
@ -635,14 +644,14 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
currentOffset++
consecutiveReads++ // Track number of successful reads for adaptive timeout
glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v",
glog.V(0).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v",
len(records), record.Offset, len(record.Key), len(record.Value), readDuration)
}
case <-ctx2.Done():
cancel2()
// Timeout - return what we have
glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart))
glog.V(0).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart))
// CRITICAL: Update session offset so next fetch knows where we left off
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
@ -651,7 +660,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
}
}
glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
glog.V(0).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
// Update session offset after successful read
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
@ -664,7 +673,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
// Keep only the most recent 1000 records
session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-1000:]
}
glog.V(2).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords))
glog.V(0).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords))
return records, nil
}
@ -690,7 +699,7 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG
session.Cancel()
}
delete(bc.subscribers, key)
glog.V(1).Infof("[FETCH] Closed subscriber for %s", key)
glog.V(0).Infof("[FETCH] Closed subscriber for %s", key)
}
}
@ -737,7 +746,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO
session.mu.Lock()
defer session.mu.Unlock()
glog.V(1).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d",
glog.V(0).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d",
session.Topic, session.Partition, session.StartOffset, newOffset)
// Close existing stream
@ -803,7 +812,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO
session.Ctx = subscriberCtx
session.StartOffset = newOffset
glog.V(1).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d",
glog.V(0).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d",
session.Topic, session.Partition, newOffset)
return nil

Loading…
Cancel
Save