Browse Source

fix tests

pull/7231/head
chrislu 2 months ago
parent
commit
ba1599b1e9
  1. 17
      weed/mq/kafka/protocol/fetch.go
  2. 45
      weed/mq/kafka/protocol/handler.go

17
weed/mq/kafka/protocol/fetch.go

@ -166,26 +166,15 @@ type FetchPartition struct {
// parseFetchRequest parses a Kafka Fetch request
func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*FetchRequest, error) {
if len(requestBody) < 16 {
if len(requestBody) < 12 {
return nil, fmt.Errorf("fetch request too short: %d bytes", len(requestBody))
}
offset := 0
request := &FetchRequest{}
// Skip client_id (string) - read length first
if offset+2 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for client_id length")
}
clientIDLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if clientIDLength >= 0 {
if offset+clientIDLength > len(requestBody) {
return nil, fmt.Errorf("insufficient data for client_id")
}
offset += clientIDLength
}
// NOTE: client_id is already handled by HandleConn and stripped from requestBody
// Request body starts directly with fetch-specific fields
// Replica ID (4 bytes)
if offset+4 > len(requestBody) {

45
weed/mq/kafka/protocol/handler.go

@ -73,6 +73,7 @@ func NewHandler() *Handler {
brokerPort: 9092,
seaweedMQHandler: &basicSeaweedMQHandler{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
},
}
}
@ -94,6 +95,8 @@ func NewTestHandler() *Handler {
// basicSeaweedMQHandler is a minimal in-memory implementation for basic Kafka functionality
type basicSeaweedMQHandler struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
mu sync.RWMutex
}
// testSeaweedMQHandler is a minimal mock implementation for testing
@ -127,15 +130,51 @@ func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error {
}
func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
return offset.NewLedger()
b.mu.Lock()
defer b.mu.Unlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Create new ledger
ledger := offset.NewLedger()
b.ledgers[key] = ledger
// Also create the topic if it doesn't exist
b.topics[topic] = true
return ledger
}
func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
return offset.NewLedger()
b.mu.RLock()
defer b.mu.RUnlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Return nil if ledger doesn't exist (topic doesn't exist)
return nil
}
func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
return 1, nil // Return offset 1 to simulate successful produce
// Store the record in the ledger
ledger := b.GetOrCreateLedger(topicName, partitionID)
// Assign an offset and append the record
offset := ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
size := int32(len(value))
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
return 0, fmt.Errorf("failed to append record: %w", err)
}
return offset, nil
}
func (b *basicSeaweedMQHandler) Close() error {

Loading…
Cancel
Save