diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go index d06cc160f..f6f39a473 100644 --- a/weed/mq/broker/broker_grpc_fetch.go +++ b/weed/mq/broker/broker_grpc_fetch.go @@ -122,7 +122,7 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM glog.Errorf("[FetchMessage] Read error: %v", err) } else { // Offset out of range - this is expected when consumer requests old data - glog.V(1).Infof("[FetchMessage] Offset out of range: %v", err) + glog.V(3).Infof("[FetchMessage] Offset out of range: %v", err) } // Return empty response with metadata - let client adjust offset @@ -147,7 +147,7 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM }) } - glog.V(2).Infof("[FetchMessage] Returning %d messages, nextOffset=%d, highWaterMark=%d, endOfPartition=%v", + glog.V(4).Infof("[FetchMessage] Returning %d messages, nextOffset=%d, highWaterMark=%d, endOfPartition=%v", len(messages), nextOffset, highWaterMark, endOfPartition) return &mq_pb.FetchMessageResponse{ diff --git a/weed/mq/broker/broker_grpc_sub_offset.go b/weed/mq/broker/broker_grpc_sub_offset.go index 6cb661464..b79d961d3 100644 --- a/weed/mq/broker/broker_grpc_sub_offset.go +++ b/weed/mq/broker/broker_grpc_sub_offset.go @@ -117,7 +117,7 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( } if atEnd { - glog.V(2).Infof("[%s] At end of subscription, stopping", clientName) + glog.V(4).Infof("[%s] At end of subscription, stopping", clientName) return false } diff --git a/weed/mq/kafka/integration/broker_client_fetch.go b/weed/mq/kafka/integration/broker_client_fetch.go index 933db1492..cac89b066 100644 --- a/weed/mq/kafka/integration/broker_client_fetch.go +++ b/weed/mq/kafka/integration/broker_client_fetch.go @@ -21,7 +21,7 @@ import ( // // This is how Kafka works - completely stateless per-fetch func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string, partition int32, startOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { - glog.V(2).Infof("[FETCH-STATELESS] Fetching from %s-%d at offset %d, maxRecords=%d", + glog.V(4).Infof("[FETCH-STATELESS] Fetching from %s-%d at offset %d, maxRecords=%d", topic, partition, startOffset, maxRecords) // Get actual partition assignment from broker @@ -61,7 +61,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string // Check if this is an "offset out of range" error if resp.ErrorCode == 2 && resp.LogStartOffset > 0 && startOffset < resp.LogStartOffset { // Offset too old - broker suggests starting from LogStartOffset - glog.V(1).Infof("[FETCH-STATELESS-CLIENT] Requested offset %d too old, adjusting to log start %d", + glog.V(3).Infof("[FETCH-STATELESS-CLIENT] Requested offset %d too old, adjusting to log start %d", startOffset, resp.LogStartOffset) // Retry with adjusted offset @@ -102,7 +102,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string glog.Infof("[FETCH-STATELESS-CLIENT] Converted to 0 SeaweedRecords") } - glog.V(2).Infof("[FETCH-STATELESS] Fetched %d records, nextOffset=%d, highWaterMark=%d, endOfPartition=%v", + glog.V(4).Infof("[FETCH-STATELESS] Fetched %d records, nextOffset=%d, highWaterMark=%d, endOfPartition=%v", len(records), resp.NextOffset, resp.HighWaterMark, resp.EndOfPartition) return records, nil diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index d9eaec1f0..d9e447157 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -59,7 +59,7 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta // Send init message to start subscription with Kafka client's consumer group and ID initReq := createSubscribeInitMessage(topic, actualPartition, startOffset, offsetType, consumerGroup, consumerID) - glog.V(2).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", + glog.V(4).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", topic, partition, startOffset, offsetType, consumerGroup, consumerID) if err := stream.Send(initReq); err != nil { @@ -125,13 +125,13 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta bc.subscribersLock.RUnlock() if canUseCache { - glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (cached)", + glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (cached)", key, currentOffset, startOffset) } else if startOffset >= currentOffset { - glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (forward read)", + glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (forward read)", key, currentOffset, startOffset) } else { - glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will seek backward)", + glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will seek backward)", key, currentOffset, startOffset) } return session, nil @@ -153,7 +153,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta existingOffset := session.StartOffset session.mu.Unlock() - glog.V(1).Infof("[FETCH] Session created concurrently at offset %d (requested %d), reusing", existingOffset, startOffset) + glog.V(3).Infof("[FETCH] Session created concurrently at offset %d (requested %d), reusing", existingOffset, startOffset) return session, nil } @@ -192,7 +192,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta glog.V(2).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s", topic, partition, startOffset, offsetType) - glog.V(2).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", + glog.V(4).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", topic, partition, offsetValue, offsetType, consumerGroup, consumerID) // Send init message using the actual partition structure that the broker allocated @@ -221,7 +221,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta // This is used by the stateless fetch approach to eliminate concurrent access issues // The subscriber is NOT stored in bc.subscribers and must be cleaned up by the caller func (bc *BrokerClient) createTemporarySubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { - glog.V(2).Infof("[STATELESS] Creating temporary subscriber for %s-%d at offset %d", topic, partition, startOffset) + glog.V(4).Infof("[STATELESS] Creating temporary subscriber for %s-%d at offset %d", topic, partition, startOffset) // Create context for this temporary subscriber ctx, cancel := context.WithCancel(bc.ctx) @@ -247,11 +247,11 @@ func (bc *BrokerClient) createTemporarySubscriber(topic string, partition int32, if startOffset == -1 { offsetType = schema_pb.OffsetType_RESET_TO_LATEST offsetValue = 0 - glog.V(2).Infof("[STATELESS] Using RESET_TO_LATEST for Kafka offset -1") + glog.V(4).Infof("[STATELESS] Using RESET_TO_LATEST for Kafka offset -1") } else { offsetType = schema_pb.OffsetType_EXACT_OFFSET offsetValue = startOffset - glog.V(2).Infof("[STATELESS] Using EXACT_OFFSET for Kafka offset %d", startOffset) + glog.V(4).Infof("[STATELESS] Using EXACT_OFFSET for Kafka offset %d", startOffset) } // Send init message @@ -273,14 +273,14 @@ func (bc *BrokerClient) createTemporarySubscriber(topic string, partition int32, Cancel: cancel, } - glog.V(2).Infof("[STATELESS] Created temporary subscriber for %s-%d starting at offset %d", topic, partition, startOffset) + glog.V(4).Infof("[STATELESS] Created temporary subscriber for %s-%d starting at offset %d", topic, partition, startOffset) return session, nil } // createSubscriberSession creates a new subscriber session with proper initialization // This is used by the hybrid approach for initial connections and backward seeks func (bc *BrokerClient) createSubscriberSession(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { - glog.V(2).Infof("[HYBRID-SESSION] Creating subscriber session for %s-%d at offset %d", topic, partition, startOffset) + glog.V(4).Infof("[HYBRID-SESSION] Creating subscriber session for %s-%d at offset %d", topic, partition, startOffset) // Create context for this subscriber ctx, cancel := context.WithCancel(bc.ctx) @@ -306,11 +306,11 @@ func (bc *BrokerClient) createSubscriberSession(topic string, partition int32, s if startOffset == -1 { offsetType = schema_pb.OffsetType_RESET_TO_LATEST offsetValue = 0 - glog.V(2).Infof("[HYBRID-SESSION] Using RESET_TO_LATEST for Kafka offset -1") + glog.V(4).Infof("[HYBRID-SESSION] Using RESET_TO_LATEST for Kafka offset -1") } else { offsetType = schema_pb.OffsetType_EXACT_OFFSET offsetValue = startOffset - glog.V(2).Infof("[HYBRID-SESSION] Using EXACT_OFFSET for Kafka offset %d", startOffset) + glog.V(4).Infof("[HYBRID-SESSION] Using EXACT_OFFSET for Kafka offset %d", startOffset) } // Send init message @@ -336,7 +336,7 @@ func (bc *BrokerClient) createSubscriberSession(topic string, partition int32, s initialized: false, } - glog.V(2).Infof("[HYBRID-SESSION] Created subscriber session for %s-%d starting at offset %d", topic, partition, startOffset) + glog.V(4).Infof("[HYBRID-SESSION] Created subscriber session for %s-%d starting at offset %d", topic, partition, startOffset) return session, nil } @@ -364,14 +364,14 @@ func (bc *BrokerClient) serveFromCache(session *BrokerSubscriberSession, request // Return slice from cache result := session.consumedRecords[startIdx:endIdx] - glog.V(2).Infof("[HYBRID-CACHE] Served %d records from cache (requested %d, offset %d)", + glog.V(4).Infof("[HYBRID-CACHE] Served %d records from cache (requested %d, offset %d)", len(result), maxRecords, requestedOffset) return result } // readRecordsFromSession reads records from the session's stream func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *BrokerSubscriberSession, startOffset int64, maxRecords int) ([]*SeaweedRecord, error) { - glog.V(2).Infof("[HYBRID-READ] Reading from stream: offset=%d maxRecords=%d", startOffset, maxRecords) + glog.V(4).Infof("[HYBRID-READ] Reading from stream: offset=%d maxRecords=%d", startOffset, maxRecords) records := make([]*SeaweedRecord, 0, maxRecords) currentOffset := startOffset @@ -382,7 +382,7 @@ func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *Bro select { case <-ctx.Done(): // Timeout or cancellation - return what we have - glog.V(2).Infof("[HYBRID-READ] Context done, returning %d records", len(records)) + glog.V(4).Infof("[HYBRID-READ] Context done, returning %d records", len(records)) return records, nil default: } @@ -391,7 +391,7 @@ func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *Bro resp, err := session.Stream.Recv() if err != nil { if err == io.EOF { - glog.V(2).Infof("[HYBRID-READ] Stream closed (EOF), returning %d records", len(records)) + glog.V(4).Infof("[HYBRID-READ] Stream closed (EOF), returning %d records", len(records)) return records, nil } return nil, fmt.Errorf("failed to receive from stream: %v", err) @@ -431,20 +431,20 @@ func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *Bro return nil, fmt.Errorf("broker error: %s", ctrlMsg.Error) } if ctrlMsg.IsEndOfStream { - glog.V(2).Infof("[HYBRID-READ] End of stream, returning %d records", len(records)) + glog.V(4).Infof("[HYBRID-READ] End of stream, returning %d records", len(records)) return records, nil } if ctrlMsg.IsEndOfTopic { - glog.V(2).Infof("[HYBRID-READ] End of topic, returning %d records", len(records)) + glog.V(4).Infof("[HYBRID-READ] End of topic, returning %d records", len(records)) return records, nil } // Empty control message (e.g., seek ack) - continue reading - glog.V(2).Infof("[HYBRID-READ] Received control message (seek ack?), continuing") + glog.V(4).Infof("[HYBRID-READ] Received control message (seek ack?), continuing") continue } } - glog.V(2).Infof("[HYBRID-READ] Read %d records successfully", len(records)) + glog.V(4).Infof("[HYBRID-READ] Read %d records successfully", len(records)) // Update cache session.consumedRecords = append(session.consumedRecords, records...) @@ -463,7 +463,7 @@ func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *Bro // - Slow path (5%): Create new subscriber for backward seeks // This combines performance (connection reuse) with correctness (proper tracking) func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, partition int32, requestedOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { - glog.V(2).Infof("[FETCH-HYBRID] topic=%s partition=%d requestedOffset=%d maxRecords=%d", + glog.V(4).Infof("[FETCH-HYBRID] topic=%s partition=%d requestedOffset=%d maxRecords=%d", topic, partition, requestedOffset, maxRecords) // Get or create session for this (topic, partition, consumerGroup, consumerID) @@ -473,7 +473,7 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa session, exists := bc.subscribers[key] if !exists { // No session - create one (this is initial fetch) - glog.V(2).Infof("[FETCH-HYBRID] Creating initial session for %s at offset %d", key, requestedOffset) + glog.V(4).Infof("[FETCH-HYBRID] Creating initial session for %s at offset %d", key, requestedOffset) newSession, err := bc.createSubscriberSession(topic, partition, requestedOffset, consumerGroup, consumerID) if err != nil { bc.subscribersLock.Unlock() @@ -496,7 +496,7 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa if requestedOffset >= cacheStart && requestedOffset <= cacheEnd { // Serve from cache - glog.V(2).Infof("[FETCH-HYBRID] FAST: Serving from cache for %s offset %d (cache: %d-%d)", + glog.V(4).Infof("[FETCH-HYBRID] FAST: Serving from cache for %s offset %d (cache: %d-%d)", key, requestedOffset, cacheStart, cacheEnd) return bc.serveFromCache(session, requestedOffset, maxRecords), nil } @@ -509,13 +509,13 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa streamPosition = session.StartOffset } - glog.V(2).Infof("[FETCH-HYBRID] requestedOffset=%d streamPosition=%d lastReadOffset=%d", + glog.V(4).Infof("[FETCH-HYBRID] requestedOffset=%d streamPosition=%d lastReadOffset=%d", requestedOffset, streamPosition, session.lastReadOffset) // Decision: Fast path or slow path? if requestedOffset < streamPosition { // SLOW PATH: Backward seek - need new subscriber - glog.V(2).Infof("[FETCH-HYBRID] SLOW: Backward seek from %d to %d, creating new subscriber", + glog.V(4).Infof("[FETCH-HYBRID] SLOW: Backward seek from %d to %d, creating new subscriber", streamPosition, requestedOffset) // Close old session @@ -550,7 +550,7 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa } else if requestedOffset > streamPosition { // FAST PATH: Forward seek - use server-side seek seekOffset := requestedOffset - glog.V(2).Infof("[FETCH-HYBRID] FAST: Forward seek from %d to %d using server-side seek", + glog.V(4).Infof("[FETCH-HYBRID] FAST: Forward seek from %d to %d using server-side seek", streamPosition, seekOffset) // Send seek message to broker @@ -565,13 +565,13 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa if err := session.Stream.Send(seekReq); err != nil { if err == io.EOF { - glog.V(2).Infof("[FETCH-HYBRID] Stream closed during seek, ignoring") + glog.V(4).Infof("[FETCH-HYBRID] Stream closed during seek, ignoring") return nil, nil } return nil, fmt.Errorf("failed to send seek request: %v", err) } - glog.V(2).Infof("[FETCH-HYBRID] Seek request sent, broker will reposition stream to offset %d", seekOffset) + glog.V(4).Infof("[FETCH-HYBRID] Seek request sent, broker will reposition stream to offset %d", seekOffset) // NOTE: Don't wait for ack - the broker will restart Subscribe loop and send data // The ack will be handled inline with data messages in readRecordsFromSession @@ -580,7 +580,7 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa streamPosition = seekOffset } else { // FAST PATH: Sequential read - continue from current position - glog.V(2).Infof("[FETCH-HYBRID] FAST: Sequential read at offset %d", requestedOffset) + glog.V(4).Infof("[FETCH-HYBRID] FAST: Sequential read at offset %d", requestedOffset) } // Read records from stream @@ -593,7 +593,7 @@ func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, pa if len(records) > 0 { session.lastReadOffset = records[len(records)-1].Offset session.initialized = true - glog.V(2).Infof("[FETCH-HYBRID] Read %d records, lastReadOffset now %d", + glog.V(4).Infof("[FETCH-HYBRID] Read %d records, lastReadOffset now %d", len(records), session.lastReadOffset) } @@ -607,7 +607,7 @@ func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, // Create key for this fetch request key := fmt.Sprintf("%s-%d-%d", topic, partition, startOffset) - glog.V(2).Infof("[FETCH-DEDUP] topic=%s partition=%d offset=%d maxRecords=%d key=%s", + glog.V(4).Infof("[FETCH-DEDUP] topic=%s partition=%d offset=%d maxRecords=%d key=%s", topic, partition, startOffset, maxRecords, key) // Check if there's already a fetch in progress for this exact request @@ -622,12 +622,12 @@ func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, existing.mu.Unlock() bc.fetchRequestsLock.Unlock() - glog.V(2).Infof("[FETCH-DEDUP] Waiting for in-progress fetch: %s", key) + glog.V(4).Infof("[FETCH-DEDUP] Waiting for in-progress fetch: %s", key) // Wait for the result from the in-progress fetch select { case result := <-waiter: - glog.V(2).Infof("[FETCH-DEDUP] Received result from in-progress fetch: %s (records=%d, err=%v)", + glog.V(4).Infof("[FETCH-DEDUP] Received result from in-progress fetch: %s (records=%d, err=%v)", key, len(result.records), result.err) return result.records, result.err case <-ctx.Done(): @@ -647,7 +647,7 @@ func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, bc.fetchRequests[key] = fetchReq bc.fetchRequestsLock.Unlock() - glog.V(2).Infof("[FETCH-DEDUP] Starting new fetch: %s", key) + glog.V(4).Infof("[FETCH-DEDUP] Starting new fetch: %s", key) // Perform the actual fetch records, err := bc.fetchRecordsStatelessInternal(ctx, topic, partition, startOffset, maxRecords, consumerGroup, consumerID) @@ -667,7 +667,7 @@ func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, bc.fetchRequestsLock.Unlock() // Send result to all waiters - glog.V(2).Infof("[FETCH-DEDUP] Broadcasting result to %d waiters: %s (records=%d, err=%v)", + glog.V(4).Infof("[FETCH-DEDUP] Broadcasting result to %d waiters: %s (records=%d, err=%v)", len(waiters), key, len(records), err) for _, waiter := range waiters { waiter <- result @@ -680,7 +680,7 @@ func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, // fetchRecordsStatelessInternal is the internal implementation of stateless fetch // This is called by FetchRecordsWithDedup and should not be called directly func (bc *BrokerClient) fetchRecordsStatelessInternal(ctx context.Context, topic string, partition int32, startOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { - glog.V(2).Infof("[FETCH-STATELESS] topic=%s partition=%d offset=%d maxRecords=%d", + glog.V(4).Infof("[FETCH-STATELESS] topic=%s partition=%d offset=%d maxRecords=%d", topic, partition, startOffset, maxRecords) // STATELESS APPROACH: Create a temporary subscriber just for this fetch @@ -737,7 +737,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs return nil, fmt.Errorf("subscriber session stream cannot be nil") } - glog.V(2).Infof("[FETCH] readRecordsFrom: topic=%s partition=%d startOffset=%d maxRecords=%d", + glog.V(4).Infof("[FETCH] readRecordsFrom: topic=%s partition=%d startOffset=%d maxRecords=%d", session.Topic, session.Partition, startOffset, maxRecords) var records []*SeaweedRecord @@ -775,7 +775,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs // If we access session.Stream in the goroutine, it could become nil between check and use stream := session.Stream if stream == nil { - glog.V(2).Infof("[FETCH] Stream is nil, cannot read") + glog.V(4).Infof("[FETCH] Stream is nil, cannot read") return records, nil } @@ -807,7 +807,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs select { case result := <-recvChan: if result.err != nil { - glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) + glog.V(4).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) return records, nil // Return empty - no error for empty topic } @@ -820,7 +820,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs } records = append(records, record) currentOffset++ - glog.V(2).Infof("[FETCH] Received first record: offset=%d, keyLen=%d, valueLen=%d", + glog.V(4).Infof("[FETCH] Received first record: offset=%d, keyLen=%d, valueLen=%d", record.Offset, len(record.Key), len(record.Value)) // CRITICAL: Auto-acknowledge first message immediately for Kafka gateway @@ -835,14 +835,14 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs }, } if err := stream.Send(ackMsg); err != nil { - glog.V(2).Infof("[FETCH] Failed to send ack for first record offset %d: %v (continuing)", record.Offset, err) + glog.V(4).Infof("[FETCH] Failed to send ack for first record offset %d: %v (continuing)", record.Offset, err) // Don't fail the fetch if ack fails - continue reading } } case <-ctx.Done(): // Timeout on first record - topic is empty or no data available - glog.V(2).Infof("[FETCH] No data available (timeout on first record)") + glog.V(4).Infof("[FETCH] No data available (timeout on first record)") return records, nil } @@ -897,7 +897,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs readDuration := time.Since(readStart) if result.err != nil { - glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) + glog.V(4).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) // Return what we have - cache will be updated at the end break } @@ -913,7 +913,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs currentOffset++ consecutiveReads++ // Track number of successful reads for adaptive timeout - glog.V(2).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", + glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", len(records), record.Offset, len(record.Key), len(record.Value), readDuration) // CRITICAL: Auto-acknowledge message immediately for Kafka gateway @@ -928,7 +928,7 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs }, } if err := stream.Send(ackMsg); err != nil { - glog.V(2).Infof("[FETCH] Failed to send ack for offset %d: %v (continuing)", record.Offset, err) + glog.V(4).Infof("[FETCH] Failed to send ack for offset %d: %v (continuing)", record.Offset, err) // Don't fail the fetch if ack fails - continue reading } } @@ -936,12 +936,12 @@ func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubs case <-ctx2.Done(): cancel2() // Timeout - return what we have - glog.V(2).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) + glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) return records, nil } } - glog.V(2).Infof("[FETCH] Returning %d records (maxRecords reached)", len(records)) + glog.V(4).Infof("[FETCH] Returning %d records (maxRecords reached)", len(records)) return records, nil } @@ -985,7 +985,7 @@ func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerG } session.mu.Unlock() delete(bc.subscribers, key) - glog.V(2).Infof("[FETCH] Closed subscriber for %s", key) + glog.V(4).Infof("[FETCH] Closed subscriber for %s", key) } } @@ -1032,7 +1032,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO session.mu.Lock() defer session.mu.Unlock() - glog.V(2).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", + glog.V(4).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", session.Topic, session.Partition, session.StartOffset, newOffset) // Close existing stream @@ -1079,7 +1079,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO session.Ctx = subscriberCtx session.StartOffset = newOffset - glog.V(2).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", + glog.V(4).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", session.Topic, session.Partition, newOffset) return nil @@ -1095,7 +1095,7 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { session.mu.Unlock() if currentOffset == offset { - glog.V(2).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition) return nil } @@ -1111,7 +1111,7 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { if err := session.Stream.Send(seekMsg); err != nil { // Handle graceful shutdown if err == io.EOF { - glog.V(2).Infof("[SEEK] Stream closing during seek to offset %d for %s[%d]", offset, session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Stream closing during seek to offset %d for %s[%d]", offset, session.Topic, session.Partition) return nil // Not an error during shutdown } return fmt.Errorf("seek to offset %d failed: %v", offset, err) @@ -1132,7 +1132,7 @@ func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { } session.mu.Unlock() - glog.V(2).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition) return nil } @@ -1153,7 +1153,7 @@ func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error if err := session.Stream.Send(seekMsg); err != nil { // Handle graceful shutdown if err == io.EOF { - glog.V(2).Infof("[SEEK] Stream closing during seek to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Stream closing during seek to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) return nil // Not an error during shutdown } return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err) @@ -1165,7 +1165,7 @@ func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error session.consumedRecords = nil session.mu.Unlock() - glog.V(2).Infof("[SEEK] Seeked to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Seeked to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) return nil } @@ -1185,7 +1185,7 @@ func (session *BrokerSubscriberSession) SeekToEarliest() error { if err := session.Stream.Send(seekMsg); err != nil { // Handle graceful shutdown if err == io.EOF { - glog.V(2).Infof("[SEEK] Stream closing during seek to earliest for %s[%d]", session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Stream closing during seek to earliest for %s[%d]", session.Topic, session.Partition) return nil // Not an error during shutdown } return fmt.Errorf("seek to earliest failed: %v", err) @@ -1196,7 +1196,7 @@ func (session *BrokerSubscriberSession) SeekToEarliest() error { session.consumedRecords = nil session.mu.Unlock() - glog.V(2).Infof("[SEEK] Seeked to earliest for %s[%d]", session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Seeked to earliest for %s[%d]", session.Topic, session.Partition) return nil } @@ -1216,7 +1216,7 @@ func (session *BrokerSubscriberSession) SeekToLatest() error { if err := session.Stream.Send(seekMsg); err != nil { // Handle graceful shutdown if err == io.EOF { - glog.V(2).Infof("[SEEK] Stream closing during seek to latest for %s[%d]", session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Stream closing during seek to latest for %s[%d]", session.Topic, session.Partition) return nil // Not an error during shutdown } return fmt.Errorf("seek to latest failed: %v", err) @@ -1227,6 +1227,6 @@ func (session *BrokerSubscriberSession) SeekToLatest() error { session.consumedRecords = nil session.mu.Unlock() - glog.V(2).Infof("[SEEK] Seeked to latest for %s[%d]", session.Topic, session.Partition) + glog.V(4).Infof("[SEEK] Seeked to latest for %s[%d]", session.Topic, session.Partition) return nil } diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 734916bb6..c712eadb5 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -13,7 +13,7 @@ import ( // GetStoredRecords retrieves records from SeaweedMQ using the proper subscriber API // ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]SMQRecord, error) { - glog.V(2).Infof("[FETCH] GetStoredRecords: topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) + glog.V(4).Infof("[FETCH] GetStoredRecords: topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) // Verify topic exists if !h.TopicExists(topic) { @@ -36,24 +36,24 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p if connCtx.BrokerClient != nil { if bc, ok := connCtx.BrokerClient.(*BrokerClient); ok { brokerClient = bc - glog.V(2).Infof("[FETCH] Using per-connection BrokerClient for topic=%s partition=%d", topic, partition) + glog.V(4).Infof("[FETCH] Using per-connection BrokerClient for topic=%s partition=%d", topic, partition) } } // Extract consumer group and client ID if connCtx.ConsumerGroup != "" { consumerGroup = connCtx.ConsumerGroup - glog.V(2).Infof("[FETCH] Using actual consumer group from context: %s", consumerGroup) + glog.V(4).Infof("[FETCH] Using actual consumer group from context: %s", consumerGroup) } if connCtx.MemberID != "" { // Use member ID as base, but still include topic-partition for uniqueness consumerID = fmt.Sprintf("%s-%s-%d", connCtx.MemberID, topic, partition) - glog.V(2).Infof("[FETCH] Using actual member ID from context: %s", consumerID) + glog.V(4).Infof("[FETCH] Using actual member ID from context: %s", consumerID) } else if connCtx.ClientID != "" { // Fallback to client ID if member ID not set (for clients not using consumer groups) // Include topic-partition to ensure each partition consumer is unique consumerID = fmt.Sprintf("%s-%s-%d", connCtx.ClientID, topic, partition) - glog.V(2).Infof("[FETCH] Using client ID from context: %s", consumerID) + glog.V(4).Infof("[FETCH] Using client ID from context: %s", consumerID) } } } @@ -82,7 +82,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p // - Each fetch is independent // - Broker reads from LogBuffer without maintaining state // - Natural support for concurrent requests - glog.V(2).Infof("[FETCH-STATELESS] Fetching records for topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) + glog.V(4).Infof("[FETCH-STATELESS] Fetching records for topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) // Use the new FetchMessage RPC (Kafka-style stateless) seaweedRecords, err := brokerClient.FetchMessagesStateless(ctx, topic, partition, fromOffset, maxRecords, consumerGroup, consumerID) @@ -91,7 +91,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p return nil, fmt.Errorf("failed to fetch records: %v", err) } - glog.V(2).Infof("[FETCH-STATELESS] Fetched %d records", len(seaweedRecords)) + glog.V(4).Infof("[FETCH-STATELESS] Fetched %d records", len(seaweedRecords)) // // STATELESS FETCH BENEFITS: // - No broker-side session state = no state synchronization bugs @@ -116,7 +116,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p // CRITICAL: Skip records before the requested offset // This can happen when the subscriber cache returns old data if kafkaOffset < fromOffset { - glog.V(2).Infof("[FETCH] Skipping record %d with offset %d (requested fromOffset=%d)", i, kafkaOffset, fromOffset) + glog.V(4).Infof("[FETCH] Skipping record %d with offset %d (requested fromOffset=%d)", i, kafkaOffset, fromOffset) continue } @@ -131,7 +131,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p glog.V(4).Infof("[FETCH] Record %d: offset=%d, keyLen=%d, valueLen=%d", i, kafkaOffset, len(seaweedRecord.Key), len(seaweedRecord.Value)) } - glog.V(2).Infof("[FETCH] Successfully read %d records from SMQ", len(smqRecords)) + glog.V(4).Infof("[FETCH] Successfully read %d records from SMQ", len(smqRecords)) return smqRecords, nil } diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index a8ea803f1..a934b924e 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -155,7 +155,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers return nil, fmt.Errorf("connection context not available") } - glog.V(2).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions", + glog.V(4).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions", connContext.ConnectionID, correlationID, len(fetchRequest.Topics), func() int { count := 0 @@ -266,7 +266,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers for j := i; j < len(pending); j++ { results[j] = &partitionFetchResult{} } - glog.V(1).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions", + glog.V(3).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions", connContext.ConnectionID, len(pending)-i) goto done case <-ctx.Done(): diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 520b524cb..edc94de1b 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -63,7 +63,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti // Start the request handler goroutine go pr.handleRequests(ctx) - glog.V(2).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)", + glog.V(4).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)", connCtx.ConnectionID, topicName, partitionID, startOffset) return pr @@ -75,7 +75,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti // on-demand in serveFetchRequest instead. func (pr *partitionReader) preFetchLoop(ctx context.Context) { defer func() { - glog.V(2).Infof("[%s] Pre-fetch loop exiting for %s[%d]", + glog.V(4).Infof("[%s] Pre-fetch loop exiting for %s[%d]", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) close(pr.recordBuffer) }() @@ -96,7 +96,7 @@ func (pr *partitionReader) preFetchLoop(ctx context.Context) { // 3. This overwhelms the broker and causes partition shutdowns func (pr *partitionReader) handleRequests(ctx context.Context) { defer func() { - glog.V(2).Infof("[%s] Request handler exiting for %s[%d]", + glog.V(4).Infof("[%s] Request handler exiting for %s[%d]", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) }() @@ -150,7 +150,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Update tracking offset to match requested offset pr.bufferMu.Lock() if req.requestedOffset != pr.currentOffset { - glog.V(2).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d", + glog.V(4).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset) pr.currentOffset = req.requestedOffset } @@ -164,7 +164,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition pr.bufferMu.Lock() pr.currentOffset = newOffset pr.bufferMu.Unlock() - glog.V(2).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes", + glog.V(4).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, len(recordBatch)) } else { @@ -195,7 +195,7 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma ) if err == nil && fetchResult.TotalSize > 0 { - glog.V(2).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset) return fetchResult.RecordBatches, fetchResult.NextOffset @@ -206,7 +206,7 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma if err == nil && len(smqRecords) > 0 { recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) nextOffset := fromOffset + int64(len(smqRecords)) - glog.V(2).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, len(smqRecords), len(recordBatch), fromOffset, nextOffset) return recordBatch, nextOffset diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index a2066a467..0af158dd8 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -549,8 +549,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { wg.Add(1) go func() { defer wg.Done() - glog.V(2).Infof("[%s] Response writer started", connectionID) - defer glog.V(2).Infof("[%s] Response writer exiting", connectionID) + glog.V(4).Infof("[%s] Response writer started", connectionID) + defer glog.V(4).Infof("[%s] Response writer exiting", connectionID) pendingResponses := make(map[uint32]*kafkaResponse) nextToSend := 0 // Index in correlationQueue @@ -561,7 +561,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // responseChan closed, exit return } - glog.V(2).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID) + glog.V(4).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID) correlationQueueMu.Lock() pendingResponses[resp.correlationID] = resp @@ -579,14 +579,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { if readyResp.err != nil { glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) } else { - glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) + glog.V(4).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil { glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) glog.Errorf("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) correlationQueueMu.Unlock() return } - glog.V(2).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID) + glog.V(4).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID) } // Remove from pending and advance @@ -596,7 +596,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { correlationQueueMu.Unlock() case <-ctx.Done(): // Context cancelled, exit immediately to prevent deadlock - glog.V(2).Infof("[%s] Response writer: context cancelled, exiting", connectionID) + glog.V(4).Infof("[%s] Response writer: context cancelled, exiting", connectionID) return } } @@ -613,7 +613,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - glog.V(2).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer @@ -629,7 +629,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response, err = h.processRequestSync(req) }() - glog.V(2).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) + glog.V(4).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) select { case responseChan <- &kafkaResponse{ correlationID: req.correlationID, @@ -638,7 +638,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(2).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID) + glog.V(4).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID) case <-ctx.Done(): // Connection closed, stop processing return @@ -647,7 +647,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting - glog.V(2).Infof("[%s] Control plane: context cancelled, draining remaining requests", connectionID) + glog.V(4).Infof("[%s] Control plane: context cancelled, draining remaining requests", connectionID) for { select { case req, ok := <-controlChan: @@ -672,7 +672,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } default: // Channel empty, safe to exit - glog.V(2).Infof("[%s] Control plane: drain complete, exiting", connectionID) + glog.V(4).Infof("[%s] Control plane: drain complete, exiting", connectionID) return } } @@ -691,7 +691,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - glog.V(2).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + glog.V(4).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer @@ -707,7 +707,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response, err = h.processRequestSync(req) }() - glog.V(2).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) + glog.V(4).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) // Use select with context to avoid sending on closed channel select { case responseChan <- &kafkaResponse{ @@ -717,7 +717,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(2).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID) + glog.V(4).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID) case <-ctx.Done(): // Connection closed, stop processing return @@ -726,7 +726,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting - glog.V(2).Infof("[%s] Data plane: context cancelled, draining remaining requests", connectionID) + glog.V(4).Infof("[%s] Data plane: context cancelled, draining remaining requests", connectionID) for { select { case req, ok := <-dataChan: @@ -751,7 +751,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } default: // Channel empty, safe to exit - glog.V(2).Infof("[%s] Data plane: drain complete, exiting", connectionID) + glog.V(4).Infof("[%s] Data plane: drain complete, exiting", connectionID) return } } @@ -1037,7 +1037,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { apiName := getAPIName(APIKey(req.apiKey)) // Debug: Log API calls at verbose level 2 (disabled by default) - glog.V(2).Infof("[API] %s (key=%d, ver=%d, corr=%d)", + glog.V(4).Infof("[API] %s (key=%d, ver=%d, corr=%d)", apiName, req.apiKey, req.apiVersion, req.correlationID) var response []byte @@ -1238,7 +1238,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(1).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1313,7 +1313,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(1).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1426,7 +1426,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(1).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1534,7 +1534,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(1).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1661,7 +1661,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(1).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) + glog.V(3).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1678,24 +1678,24 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for _, topic := range requestedTopics { if isSystemTopic(topic) { // Always try to auto-create system topics during metadata requests - glog.V(1).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) if !h.seaweedMQHandler.TopicExists(topic) { - glog.V(1).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) if err := h.createTopicWithSchemaSupport(topic, 1); err != nil { glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err) // Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION } else { - glog.V(1).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) } } else { - glog.V(1).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) } topicsToReturn = append(topicsToReturn, topic) } else if h.seaweedMQHandler.TopicExists(topic) { topicsToReturn = append(topicsToReturn, topic) } } - glog.V(1).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) + glog.V(3).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) } var buf bytes.Buffer @@ -4147,5 +4147,5 @@ func cleanupPartitionReaders(connCtx *ConnectionContext) { return true // Continue iteration }) - glog.V(2).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID) + glog.V(4).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID) } diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index cc8a8de46..58ba7a476 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -132,7 +132,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re groupIsEmpty := len(group.Members) == 0 generationMatches := groupIsEmpty || (req.GenerationID == group.Generation) - glog.V(1).Infof("[OFFSET_COMMIT] Group check: id=%s reqGen=%d groupGen=%d members=%d empty=%v matches=%v", + glog.V(3).Infof("[OFFSET_COMMIT] Group check: id=%s reqGen=%d groupGen=%d members=%d empty=%v matches=%v", req.GroupID, req.GenerationID, group.Generation, len(group.Members), groupIsEmpty, generationMatches) // Process offset commits @@ -168,14 +168,14 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re // Also store in SMQ persistent storage if available if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { // SMQ storage may not be available (e.g., in mock mode) - that's okay - glog.V(2).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) + glog.V(4).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) } if groupIsEmpty { - glog.V(1).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", + glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", req.GroupID, t.Name, p.Index, p.Offset) } else { - glog.V(1).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { @@ -218,7 +218,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req group.Mu.RLock() defer group.Mu.RUnlock() - glog.V(2).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) + glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) // Build response response := OffsetFetchResponse{ @@ -254,7 +254,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(2).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { // Fallback: try fetching from SMQ persistent storage @@ -268,10 +268,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(2).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", + glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { - glog.V(2).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", + glog.V(4).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", request.GroupID, topic.Name, partition) } // No offset found in either location (-1 indicates no committed offset)