From 2b8261c65b0a3228023d513b13e5410b10a4e245 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 16:28:35 -0700 Subject: [PATCH] fix tests --- weed/mq/kafka/protocol/fetch.go | 28 +++++++++++++- .../kafka/protocol/metadata_blocking_test.go | 12 ++++++ .../protocol/offset_fetch_pattern_test.go | 38 +++++++------------ 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 5daf8eeeb..e8998fc04 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -1150,7 +1150,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB // Validate that the unmarshaled RecordValue is actually a valid RecordValue // Protobuf unmarshal is lenient and can succeed with garbage data for random bytes // We need to check if this looks like a real RecordValue or just random bytes - if !h.isValidRecordValue(recordValue) { + if !h.isValidRecordValue(recordValue, recordValueBytes) { // Not a valid RecordValue - return raw bytes as-is return recordValueBytes } @@ -1168,7 +1168,8 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB } // isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes -func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue) bool { +// This performs a roundtrip test: marshal the RecordValue and check if it produces similar output +func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue, originalBytes []byte) bool { // Empty or nil Fields means not a valid RecordValue if recordValue == nil || recordValue.Fields == nil || len(recordValue.Fields) == 0 { return false @@ -1194,6 +1195,29 @@ func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue) bool { } } + // Roundtrip check: If this is a real RecordValue, marshaling it back should produce + // similar-sized output. Random bytes that accidentally parse as protobuf will typically + // produce very different output when marshaled back. + remarshaled, err := proto.Marshal(recordValue) + if err != nil { + return false + } + + // Check if the sizes are reasonably similar (within 50% tolerance) + // Real RecordValue will have similar size, random bytes will be very different + originalSize := len(originalBytes) + remarshaledSize := len(remarshaled) + if originalSize == 0 { + return false + } + + // Calculate size ratio - should be close to 1.0 for real RecordValue + ratio := float64(remarshaledSize) / float64(originalSize) + if ratio < 0.5 || ratio > 2.0 { + // Size differs too much - this is likely random bytes parsed as protobuf + return false + } + return true } diff --git a/weed/mq/kafka/protocol/metadata_blocking_test.go b/weed/mq/kafka/protocol/metadata_blocking_test.go index d6e5ee893..e5dfd1f95 100644 --- a/weed/mq/kafka/protocol/metadata_blocking_test.go +++ b/weed/mq/kafka/protocol/metadata_blocking_test.go @@ -199,6 +199,10 @@ func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler // No-op } +func (h *FastMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *FastMockHandler) Close() error { return nil } @@ -270,6 +274,10 @@ func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHan // No-op } +func (h *BlockingMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *BlockingMockHandler) Close() error { return nil } @@ -356,6 +364,10 @@ func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.Protoco // No-op } +func (h *TimeoutAwareMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *TimeoutAwareMockHandler) Close() error { return nil } diff --git a/weed/mq/kafka/protocol/offset_fetch_pattern_test.go b/weed/mq/kafka/protocol/offset_fetch_pattern_test.go index 6c2beb5da..e23c1391e 100644 --- a/weed/mq/kafka/protocol/offset_fetch_pattern_test.go +++ b/weed/mq/kafka/protocol/offset_fetch_pattern_test.go @@ -1,12 +1,9 @@ package protocol import ( - "context" "fmt" "testing" "time" - - "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" ) // TestOffsetCommitFetchPattern verifies the critical pattern: @@ -22,11 +19,11 @@ func TestOffsetCommitFetchPattern(t *testing.T) { // Setup const ( - topic = "test-topic" - partition = int32(0) - messageCount = 1000 - batchSize = 50 - groupID = "test-group" + topic = "test-topic" + partition = int32(0) + messageCount = 1000 + batchSize = 50 + groupID = "test-group" ) // Mock store for offsets @@ -107,14 +104,10 @@ func TestOffsetFetchAfterCommit(t *testing.T) { } type FetchResponse struct { - records []byte + records []byte nextOffset int64 } - // Track all fetch requests - fetchRequests := []FetchRequest{} - fetchResponses := []FetchResponse{} - // Simulate: Commit offset 163, then fetch offset 164 committedOffset := int64(163) nextFetchOffset := committedOffset + 1 @@ -123,7 +116,7 @@ func TestOffsetFetchAfterCommit(t *testing.T) { // This is where consumers are getting stuck! // They commit offset 163, then fetch 164+, but get empty response - + // Expected: Fetch(164) returns records starting from offset 164 // Actual Bug: Fetch(164) returns empty, consumer stops fetching @@ -144,9 +137,9 @@ func TestOffsetPersistencePattern(t *testing.T) { t.Run("OffsetRecovery", func(t *testing.T) { const ( - groupID = "test-group" - topic = "test-topic" - partition = int32(0) + groupID = "test-group" + topic = "test-topic" + partition = int32(0) ) offsetStore := make(map[string]int64) @@ -202,7 +195,7 @@ func TestOffsetCommitConsistency(t *testing.T) { for _, commit := range commits { key := fmt.Sprintf("%s/%s/%d", commit.Group, commit.Topic, commit.Partition) t.Logf("Committing %s at offset %d", key, commit.Offset) - + // Verify offset is correctly persisted // (In real test, would read from SMQ storage) } @@ -218,9 +211,9 @@ func TestFetchEmptyPartitionHandling(t *testing.T) { t.Run("EmptyPartitionBehavior", func(t *testing.T) { const ( - topic = "test-topic" - partition = int32(0) - lastOffset = int64(999) // Messages 0-999 exist + topic = "test-topic" + partition = int32(0) + lastOffset = int64(999) // Messages 0-999 exist ) // Test 1: Fetch at HWM should return empty @@ -248,13 +241,10 @@ func TestLongPollWithOffsetCommit(t *testing.T) { // This was bug 8969b4509 const maxWaitTime = 5 * time.Second - start := time.Now() // Simulate long-poll wait (no data available) time.Sleep(100 * time.Millisecond) // Broker waits up to maxWaitTime - elapsed := time.Since(start) - // throttleTimeMs should be 0 (NOT elapsed duration!) throttleTimeMs := int32(0) // CORRECT // throttleTimeMs := int32(elapsed / time.Millisecond) // WRONG (previous bug)