Browse Source

fix kafka tests

pull/7231/head
chrislu 3 months ago
parent
commit
4f0dd2e527
  1. 4
      weed/mq/kafka/protocol/fetch.go
  2. 20
      weed/mq/kafka/protocol/fetch_test.go

4
weed/mq/kafka/protocol/fetch.go

@ -264,8 +264,8 @@ func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*Fet
request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
offset += 4 offset += 4
// Current leader epoch (4 bytes) - only in v9+
if apiVersion >= 9 {
// Current leader epoch (4 bytes) - only in v5+
if apiVersion >= 5 {
if offset+4 > len(requestBody) { if offset+4 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for current leader epoch") return nil, fmt.Errorf("insufficient data for current leader epoch")
} }

20
weed/mq/kafka/protocol/fetch_test.go

@ -24,13 +24,10 @@ func TestHandler_handleFetch(t *testing.T) {
ledger.AppendRecord(baseOffset+2, currentTime+2000, 150) ledger.AppendRecord(baseOffset+2, currentTime+2000, 150)
// Build a Fetch request // Build a Fetch request
clientID := "test-consumer"
requestBody := make([]byte, 0, 256) requestBody := make([]byte, 0, 256)
// Client ID
requestBody = append(requestBody, 0, byte(len(clientID)))
requestBody = append(requestBody, []byte(clientID)...)
// NOTE: client_id is handled by HandleConn and stripped before reaching handler
// Start directly with Fetch-specific fields
// Replica ID (-1 for consumer) // Replica ID (-1 for consumer)
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF)
@ -166,14 +163,12 @@ func TestHandler_handleFetch_UnknownTopic(t *testing.T) {
correlationID := uint32(777) correlationID := uint32(777)
// Build Fetch request for non-existent topic (don't create it) // Build Fetch request for non-existent topic (don't create it)
clientID := "test-consumer"
topicName := "non-existent-topic" topicName := "non-existent-topic"
requestBody := make([]byte, 0, 128) requestBody := make([]byte, 0, 128)
// Client ID
requestBody = append(requestBody, 0, byte(len(clientID)))
requestBody = append(requestBody, []byte(clientID)...)
// NOTE: client_id is handled by HandleConn and stripped before reaching handler
// Start directly with Fetch-specific fields
// Standard Fetch parameters // Standard Fetch parameters
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID
@ -228,13 +223,10 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) {
_ = ledger // ledger exists but is empty _ = ledger // ledger exists but is empty
// Build Fetch request // Build Fetch request
clientID := "test-consumer"
requestBody := make([]byte, 0, 128) requestBody := make([]byte, 0, 128)
// Client ID
requestBody = append(requestBody, 0, byte(len(clientID)))
requestBody = append(requestBody, []byte(clientID)...)
// NOTE: client_id is handled by HandleConn and stripped before reaching handler
// Start directly with Fetch-specific fields
// Standard parameters // Standard parameters
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID

Loading…
Cancel
Save