From ba1599b1e9ee201ef34ca070d934a8761850e070 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 08:31:16 -0700 Subject: [PATCH] fix tests --- weed/mq/kafka/protocol/fetch.go | 17 ++--------- weed/mq/kafka/protocol/handler.go | 49 +++++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index b1b3517d2..31325e2ad 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -166,26 +166,15 @@ type FetchPartition struct { // parseFetchRequest parses a Kafka Fetch request func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*FetchRequest, error) { - if len(requestBody) < 16 { + if len(requestBody) < 12 { return nil, fmt.Errorf("fetch request too short: %d bytes", len(requestBody)) } offset := 0 request := &FetchRequest{} - // Skip client_id (string) - read length first - if offset+2 > len(requestBody) { - return nil, fmt.Errorf("insufficient data for client_id length") - } - clientIDLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) - offset += 2 - - if clientIDLength >= 0 { - if offset+clientIDLength > len(requestBody) { - return nil, fmt.Errorf("insufficient data for client_id") - } - offset += clientIDLength - } + // NOTE: client_id is already handled by HandleConn and stripped from requestBody + // Request body starts directly with fetch-specific fields // Replica ID (4 bytes) if offset+4 > len(requestBody) { diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 266252b12..e29ec3770 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -72,7 +72,8 @@ func NewHandler() *Handler { brokerHost: "localhost", brokerPort: 9092, seaweedMQHandler: &basicSeaweedMQHandler{ - topics: make(map[string]bool), + topics: make(map[string]bool), + ledgers: make(map[string]*offset.Ledger), }, } } @@ -93,7 +94,9 @@ func NewTestHandler() *Handler { // basicSeaweedMQHandler is a minimal in-memory implementation for basic Kafka functionality type basicSeaweedMQHandler struct { - topics map[string]bool + topics map[string]bool + ledgers map[string]*offset.Ledger + mu sync.RWMutex } // testSeaweedMQHandler is a minimal mock implementation for testing @@ -127,15 +130,51 @@ func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error { } func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { - return offset.NewLedger() + b.mu.Lock() + defer b.mu.Unlock() + + key := fmt.Sprintf("%s-%d", topic, partition) + if ledger, exists := b.ledgers[key]; exists { + return ledger + } + + // Create new ledger + ledger := offset.NewLedger() + b.ledgers[key] = ledger + + // Also create the topic if it doesn't exist + b.topics[topic] = true + + return ledger } func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger { - return offset.NewLedger() + b.mu.RLock() + defer b.mu.RUnlock() + + key := fmt.Sprintf("%s-%d", topic, partition) + if ledger, exists := b.ledgers[key]; exists { + return ledger + } + + // Return nil if ledger doesn't exist (topic doesn't exist) + return nil } func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { - return 1, nil // Return offset 1 to simulate successful produce + // Store the record in the ledger + ledger := b.GetOrCreateLedger(topicName, partitionID) + + // Assign an offset and append the record + offset := ledger.AssignOffsets(1) + timestamp := time.Now().UnixNano() + size := int32(len(value)) + + if err := ledger.AppendRecord(offset, timestamp, size); err != nil { + return 0, fmt.Errorf("failed to append record: %w", err) + } + + return offset, nil } func (b *basicSeaweedMQHandler) Close() error {