Browse Source

fix tests

pull/7329/head
chrislu 4 days ago
parent
commit
2b8261c65b
  1. 28
      weed/mq/kafka/protocol/fetch.go
  2. 12
      weed/mq/kafka/protocol/metadata_blocking_test.go
  3. 38
      weed/mq/kafka/protocol/offset_fetch_pattern_test.go

28
weed/mq/kafka/protocol/fetch.go

@ -1150,7 +1150,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
// Validate that the unmarshaled RecordValue is actually a valid RecordValue
// Protobuf unmarshal is lenient and can succeed with garbage data for random bytes
// We need to check if this looks like a real RecordValue or just random bytes
if !h.isValidRecordValue(recordValue) {
if !h.isValidRecordValue(recordValue, recordValueBytes) {
// Not a valid RecordValue - return raw bytes as-is
return recordValueBytes
}
@ -1168,7 +1168,8 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
}
// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes
func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue) bool {
// This performs a roundtrip test: marshal the RecordValue and check if it produces similar output
func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue, originalBytes []byte) bool {
// Empty or nil Fields means not a valid RecordValue
if recordValue == nil || recordValue.Fields == nil || len(recordValue.Fields) == 0 {
return false
@ -1194,6 +1195,29 @@ func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue) bool {
}
}
// Roundtrip check: If this is a real RecordValue, marshaling it back should produce
// similar-sized output. Random bytes that accidentally parse as protobuf will typically
// produce very different output when marshaled back.
remarshaled, err := proto.Marshal(recordValue)
if err != nil {
return false
}
// Check if the sizes are reasonably similar (within 50% tolerance)
// Real RecordValue will have similar size, random bytes will be very different
originalSize := len(originalBytes)
remarshaledSize := len(remarshaled)
if originalSize == 0 {
return false
}
// Calculate size ratio - should be close to 1.0 for real RecordValue
ratio := float64(remarshaledSize) / float64(originalSize)
if ratio < 0.5 || ratio > 2.0 {
// Size differs too much - this is likely random bytes parsed as protobuf
return false
}
return true
}

12
weed/mq/kafka/protocol/metadata_blocking_test.go

@ -199,6 +199,10 @@ func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler
// No-op
}
func (h *FastMockHandler) InvalidateTopicExistsCache(topic string) {
// No-op for mock
}
func (h *FastMockHandler) Close() error {
return nil
}
@ -270,6 +274,10 @@ func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHan
// No-op
}
func (h *BlockingMockHandler) InvalidateTopicExistsCache(topic string) {
// No-op for mock
}
func (h *BlockingMockHandler) Close() error {
return nil
}
@ -356,6 +364,10 @@ func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.Protoco
// No-op
}
func (h *TimeoutAwareMockHandler) InvalidateTopicExistsCache(topic string) {
// No-op for mock
}
func (h *TimeoutAwareMockHandler) Close() error {
return nil
}

38
weed/mq/kafka/protocol/offset_fetch_pattern_test.go

@ -1,12 +1,9 @@
package protocol
import (
"context"
"fmt"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
)
// TestOffsetCommitFetchPattern verifies the critical pattern:
@ -22,11 +19,11 @@ func TestOffsetCommitFetchPattern(t *testing.T) {
// Setup
const (
topic = "test-topic"
partition = int32(0)
messageCount = 1000
batchSize = 50
groupID = "test-group"
topic = "test-topic"
partition = int32(0)
messageCount = 1000
batchSize = 50
groupID = "test-group"
)
// Mock store for offsets
@ -107,14 +104,10 @@ func TestOffsetFetchAfterCommit(t *testing.T) {
}
type FetchResponse struct {
records []byte
records []byte
nextOffset int64
}
// Track all fetch requests
fetchRequests := []FetchRequest{}
fetchResponses := []FetchResponse{}
// Simulate: Commit offset 163, then fetch offset 164
committedOffset := int64(163)
nextFetchOffset := committedOffset + 1
@ -123,7 +116,7 @@ func TestOffsetFetchAfterCommit(t *testing.T) {
// This is where consumers are getting stuck!
// They commit offset 163, then fetch 164+, but get empty response
// Expected: Fetch(164) returns records starting from offset 164
// Actual Bug: Fetch(164) returns empty, consumer stops fetching
@ -144,9 +137,9 @@ func TestOffsetPersistencePattern(t *testing.T) {
t.Run("OffsetRecovery", func(t *testing.T) {
const (
groupID = "test-group"
topic = "test-topic"
partition = int32(0)
groupID = "test-group"
topic = "test-topic"
partition = int32(0)
)
offsetStore := make(map[string]int64)
@ -202,7 +195,7 @@ func TestOffsetCommitConsistency(t *testing.T) {
for _, commit := range commits {
key := fmt.Sprintf("%s/%s/%d", commit.Group, commit.Topic, commit.Partition)
t.Logf("Committing %s at offset %d", key, commit.Offset)
// Verify offset is correctly persisted
// (In real test, would read from SMQ storage)
}
@ -218,9 +211,9 @@ func TestFetchEmptyPartitionHandling(t *testing.T) {
t.Run("EmptyPartitionBehavior", func(t *testing.T) {
const (
topic = "test-topic"
partition = int32(0)
lastOffset = int64(999) // Messages 0-999 exist
topic = "test-topic"
partition = int32(0)
lastOffset = int64(999) // Messages 0-999 exist
)
// Test 1: Fetch at HWM should return empty
@ -248,13 +241,10 @@ func TestLongPollWithOffsetCommit(t *testing.T) {
// This was bug 8969b4509
const maxWaitTime = 5 * time.Second
start := time.Now()
// Simulate long-poll wait (no data available)
time.Sleep(100 * time.Millisecond) // Broker waits up to maxWaitTime
elapsed := time.Since(start)
// throttleTimeMs should be 0 (NOT elapsed duration!)
throttleTimeMs := int32(0) // CORRECT
// throttleTimeMs := int32(elapsed / time.Millisecond) // WRONG (previous bug)

Loading…
Cancel
Save