diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index c712eadb5..0ef659050 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -172,6 +172,7 @@ func (h *SeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64 if time.Now().Before(entry.expiresAt) { // Cache hit - return cached value h.hwmCacheMu.RUnlock() + glog.V(2).Infof("[HWM] Cache HIT for %s: hwm=%d", cacheKey, entry.value) return entry.value, nil } } @@ -179,11 +180,15 @@ func (h *SeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64 // Cache miss or expired - query SMQ broker if h.brokerClient != nil { + glog.V(2).Infof("[HWM] Cache MISS for %s, querying broker...", cacheKey) latestOffset, err := h.brokerClient.GetHighWaterMark(topic, partition) if err != nil { + glog.V(1).Infof("[HWM] ERROR querying broker for %s: %v", cacheKey, err) return 0, err } + glog.V(2).Infof("[HWM] Broker returned hwm=%d for %s", latestOffset, cacheKey) + // Update cache h.hwmCacheMu.Lock() h.hwmCache[cacheKey] = &hwmCacheEntry{ diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index fce7c7efa..b0ee5d95f 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -138,11 +138,14 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition } result.highWaterMark = hwm + glog.V(2).Infof("[%s] FETCH %s[%d]: requestedOffset=%d hwm=%d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + // If requested offset >= HWM, return immediately with empty result // This prevents overwhelming the broker with futile read attempts when no data is available if req.requestedOffset >= hwm { result.recordBatch = []byte{} - glog.V(3).Infof("[%s] No data available for %s[%d]: offset=%d >= hwm=%d", + glog.V(2).Infof("[%s] FETCH %s[%d]: EMPTY (offset %d >= hwm %d)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) return }