diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 828225e74..e1509642a 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -60,6 +60,9 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta }, } + glog.V(0).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", + topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + if err := stream.Send(initReq); err != nil { return nil, fmt.Errorf("failed to send subscribe init: %v", err) } @@ -119,7 +122,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta if startOffset >= currentOffset || canUseCache { // Can read forward OR offset is in cache - reuse session bc.subscribersLock.RUnlock() - glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", + glog.V(0).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", key, currentOffset, startOffset) return session, nil } @@ -127,7 +130,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta // Backward seek, not in cache // Let ReadRecordsFromOffset handle the recreation decision based on the actual read context bc.subscribersLock.RUnlock() - glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", + glog.V(0).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", key, currentOffset, startOffset) return session, nil } @@ -193,19 +196,22 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta offsetType = schema_pb.OffsetType_RESET_TO_LATEST startTimestamp = 0 // Not used with RESET_TO_LATEST startOffsetValue = 0 // Not used with RESET_TO_LATEST - glog.V(1).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") + glog.V(0).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") } else { // CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset // This allows the subscriber to read from both buffer and disk at the correct position offsetType = schema_pb.OffsetType_EXACT_OFFSET startTimestamp = 0 // Not used with EXACT_OFFSET startOffsetValue = startOffset // Use the exact Kafka offset - glog.V(1).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) + glog.V(0).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) } - glog.V(1).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)", + glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)", topic, partition, startOffset, offsetType, startTimestamp) + glog.V(0).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", + topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + // Send init message using the actual partition structure that the broker allocated if err := stream.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ @@ -242,7 +248,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta } bc.subscribers[key] = session - glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key) + glog.V(0).Infof("Created subscriber session for %s with context cancellation support", key) return session, nil } @@ -256,7 +262,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok session.mu.Lock() - glog.V(2).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d", + glog.V(0).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d", session.Topic, session.Partition, requestedOffset, session.StartOffset, maxRecords) // Check cache first @@ -269,7 +275,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok startIdx := int(requestedOffset - cacheStartOffset) // CRITICAL: Bounds check to prevent race condition where cache is modified between checks if startIdx < 0 || startIdx >= len(session.consumedRecords) { - glog.V(2).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read", + glog.V(0).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read", startIdx, len(session.consumedRecords)) // Cache was modified, fall through to normal read path } else { @@ -277,13 +283,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok if endIdx > len(session.consumedRecords) { endIdx = len(session.consumedRecords) } - glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", + glog.V(0).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) session.mu.Unlock() return session.consumedRecords[startIdx:endIdx], nil } } else { - glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", + glog.V(0).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) } } @@ -380,9 +386,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok startTimestamp := int64(0) startOffsetValue := requestedOffset - glog.V(1).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", + glog.V(0).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", topic, partition, requestedOffset) + glog.V(0).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", + topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + // Send init message using the actual partition structure if err := stream.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ @@ -425,7 +434,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) // Read from fresh subscriber - glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) + glog.V(0).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) return bc.ReadRecords(ctx, newSession, maxRecords) } @@ -433,7 +442,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // This handles: // - Exact match (requestedOffset == session.StartOffset) // - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache) - glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", + glog.V(0).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", requestedOffset, currentStartOffset) // Note: session.mu was already unlocked at line 294 after reading currentStartOffset return bc.ReadRecords(ctx, session, maxRecords) @@ -457,7 +466,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib session.mu.Lock() defer session.mu.Unlock() - glog.V(2).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", + glog.V(0).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", session.Topic, session.Partition, session.StartOffset, maxRecords) var records []*SeaweedRecord @@ -476,7 +485,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset { // Records are in cache - glog.V(2).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", + glog.V(0).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", currentOffset, cacheStartOffset, cacheEndOffset) // Find starting index in cache @@ -492,7 +501,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib endIdx = len(session.consumedRecords) } - glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) + glog.V(0).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) return session.consumedRecords[startIdx:endIdx], nil } } @@ -545,7 +554,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib select { case result := <-recvChan: if result.err != nil { - glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) + glog.V(0).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) return records, nil // Return empty - no error for empty topic } @@ -558,13 +567,13 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib } records = append(records, record) currentOffset++ - glog.V(4).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", + glog.V(0).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", record.Offset, len(record.Key), len(record.Value)) } case <-ctx.Done(): // Timeout on first record - topic is empty or no data available - glog.V(4).Infof("[FETCH] No data available (timeout on first record)") + glog.V(0).Infof("[FETCH] No data available (timeout on first record)") return records, nil } @@ -616,7 +625,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib readDuration := time.Since(readStart) if result.err != nil { - glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) + glog.V(0).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) // Update session offset before returning glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) @@ -635,14 +644,14 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib currentOffset++ consecutiveReads++ // Track number of successful reads for adaptive timeout - glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", + glog.V(0).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", len(records), record.Offset, len(record.Key), len(record.Value), readDuration) } case <-ctx2.Done(): cancel2() // Timeout - return what we have - glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) + glog.V(0).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) // CRITICAL: Update session offset so next fetch knows where we left off glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) @@ -651,7 +660,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib } } - glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) + glog.V(0).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) // Update session offset after successful read glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) @@ -664,7 +673,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib // Keep only the most recent 1000 records session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-1000:] } - glog.V(2).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) + glog.V(0).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) return records, nil } @@ -690,7 +699,7 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG session.Cancel() } delete(bc.subscribers, key) - glog.V(1).Infof("[FETCH] Closed subscriber for %s", key) + glog.V(0).Infof("[FETCH] Closed subscriber for %s", key) } } @@ -737,7 +746,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO session.mu.Lock() defer session.mu.Unlock() - glog.V(1).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", + glog.V(0).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", session.Topic, session.Partition, session.StartOffset, newOffset) // Close existing stream @@ -803,7 +812,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO session.Ctx = subscriberCtx session.StartOffset = newOffset - glog.V(1).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", + glog.V(0).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", session.Topic, session.Partition, newOffset) return nil