From ba73939ca2510b0eed3797064b80ec3212a82b58 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 23:02:10 -0700 Subject: [PATCH] fixes --- weed/mq/kafka/gateway/server.go | 22 +++++-- weed/mq/kafka/protocol/produce.go | 103 +++++++++--------------------- weed/mq/offset/manager.go | 90 +++++++++++++++----------- 3 files changed, 99 insertions(+), 116 deletions(-) diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 3ea73f365..a839081f8 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -49,8 +49,9 @@ func resolveAdvertisedAddress() string { type Options struct { Listen string - Masters string // SeaweedFS master servers (required) + Masters string // SeaweedFS master servers (required for production mode) FilerGroup string // filer group name (optional) + TestMode bool // Use in-memory handler for testing (optional) } type Server struct { @@ -65,12 +66,21 @@ type Server struct { func NewServer(opts Options) *Server { ctx, cancel := context.WithCancel(context.Background()) - // Create broker-based SeaweedMQ handler - handler, err := protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup) - if err != nil { - glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err) + var handler *protocol.Handler + var err error + + if opts.TestMode || opts.Masters == "" { + // Use in-memory handler for testing or when no masters are configured + handler = protocol.NewHandler() + glog.V(1).Infof("Created Kafka gateway with in-memory handler for testing") + } else { + // Create broker-based SeaweedMQ handler for production + handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup) + if err != nil { + glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err) + } + glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) } - glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) return &Server{ opts: opts, diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 722ce0f34..7b687d32f 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -330,60 +330,36 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r // - Request: transactional_id field (nullable string) at the beginning // - Response: throttle_time_ms field at the end (v1+) - // Parse Produce v7 request format based on actual Sarama request - // Format: client_id(STRING) + transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY) + // Parse Produce v7 request format (client_id is already handled by HandleConn) + // Format: transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY) offset := 0 - // Parse client_id (STRING: 2 bytes length + data) + // Parse transactional_id (NULLABLE_STRING: 2 bytes length + data, -1 = null) if len(requestBody) < 2 { - return nil, fmt.Errorf("Produce v%d request too short for client_id", apiVersion) - } - clientIDLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) - offset += 2 - - if len(requestBody) < offset+int(clientIDLen) { - return nil, fmt.Errorf("Produce v%d request client_id too short", apiVersion) + return nil, fmt.Errorf("Produce v%d request too short for transactional_id", apiVersion) } - clientID := string(requestBody[offset : offset+int(clientIDLen)]) - offset += int(clientIDLen) - fmt.Printf("DEBUG: Produce v%d - client_id: %s\n", apiVersion, clientID) - // Parse transactional_id (NULLABLE_STRING: 2 bytes length + data, -1 = null) var transactionalID string = "null" - baseTxOffset := offset - if len(requestBody) >= offset+2 { - possibleLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) - consumedTx := false - if possibleLen == -1 { - // consume just the length - offset += 2 - consumedTx = true - } else if possibleLen >= 0 && len(requestBody) >= offset+2+int(possibleLen)+6 { - // There is enough room for a string and acks/timeout after it - offset += 2 - if int(possibleLen) > 0 { - if len(requestBody) < offset+int(possibleLen) { - return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) - } - transactionalID = string(requestBody[offset : offset+int(possibleLen)]) - offset += int(possibleLen) - } - consumedTx = true + txIDLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + + if txIDLen == -1 { + // null transactional_id + transactionalID = "null" + fmt.Printf("DEBUG: Produce v%d - transactional_id: null\n", apiVersion) + } else if txIDLen >= 0 { + if len(requestBody) < offset+int(txIDLen) { + return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) } - // Tentatively consumed transactional_id; we'll validate later and may revert - _ = consumedTx + transactionalID = string(requestBody[offset : offset+int(txIDLen)]) + offset += int(txIDLen) + fmt.Printf("DEBUG: Produce v%d - transactional_id: %s\n", apiVersion, transactionalID) } - fmt.Printf("DEBUG: Produce v%d - transactional_id: %s\n", apiVersion, transactionalID) // Parse acks (INT16) and timeout_ms (INT32) if len(requestBody) < offset+6 { - // If transactional_id was mis-parsed, revert and try without it - offset = baseTxOffset - transactionalID = "null" - if len(requestBody) < offset+6 { - return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion) - } + return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion) } acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) @@ -393,37 +369,20 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r fmt.Printf("DEBUG: Produce v%d - acks: %d, timeout: %d\n", apiVersion, acks, timeout) - // Parse topics array - if len(requestBody) < offset+4 { - // Fallback: treat transactional_id as absent if this seems invalid - offset = baseTxOffset - transactionalID = "null" - if len(requestBody) < offset+6 { - return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion) - } - acks = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) - offset += 2 - timeout = binary.BigEndian.Uint32(requestBody[offset : offset+4]) - offset += 4 + // If acks=0, fire-and-forget - return empty response immediately + if acks == 0 { + fmt.Printf("DEBUG: Produce v%d - acks=0, returning empty response (fire-and-forget)\n", apiVersion) + return []byte{}, nil } + fmt.Printf("DEBUG: Produce v%d - acks=%d, will process and return response\n", apiVersion, acks) + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // If topicsCount is implausible, revert transactional_id consumption and re-parse once + // If topicsCount is implausible, there might be a parsing issue if topicsCount > 1000 { - // revert - offset = baseTxOffset - transactionalID = "null" - acks = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) - offset += 2 - timeout = binary.BigEndian.Uint32(requestBody[offset : offset+4]) - offset += 4 - if len(requestBody) < offset+4 { - return nil, fmt.Errorf("Produce v%d request missing topics count", apiVersion) - } - topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4]) - offset += 4 + return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount) } fmt.Printf("DEBUG: Produce v%d - topics count: %d\n", apiVersion, topicsCount) @@ -573,17 +532,15 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r } } - // If acks=0, fire-and-forget - return empty response per Kafka spec - if acks == 0 { - return []byte{}, nil - } - // Append throttle_time_ms at the END for v1+ if apiVersion >= 1 { response = append(response, 0, 0, 0, 0) } - fmt.Printf("DEBUG: Produce v%d response: %d bytes\n", apiVersion, len(response)) + fmt.Printf("DEBUG: Produce v%d response: %d bytes (acks=%d)\n", apiVersion, len(response), acks) + if len(response) < 20 { + fmt.Printf("DEBUG: Produce v%d response hex: %x\n", apiVersion, response) + } return response, nil } diff --git a/weed/mq/offset/manager.go b/weed/mq/offset/manager.go index 9d268dc96..0fd6b969e 100644 --- a/weed/mq/offset/manager.go +++ b/weed/mq/offset/manager.go @@ -13,21 +13,21 @@ type PartitionOffsetManager struct { mu sync.RWMutex partition *schema_pb.Partition nextOffset int64 - + // Checkpointing for recovery lastCheckpoint int64 checkpointInterval int64 - storage OffsetStorage + storage OffsetStorage } // OffsetStorage interface for persisting offset state type OffsetStorage interface { // SaveCheckpoint persists the current offset state for recovery SaveCheckpoint(partition *schema_pb.Partition, offset int64) error - + // LoadCheckpoint retrieves the last saved offset state LoadCheckpoint(partition *schema_pb.Partition) (int64, error) - + // GetHighestOffset scans storage to find the highest assigned offset GetHighestOffset(partition *schema_pb.Partition) (int64, error) } @@ -36,48 +36,64 @@ type OffsetStorage interface { func NewPartitionOffsetManager(partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) { manager := &PartitionOffsetManager{ partition: partition, - checkpointInterval: 100, // Checkpoint every 100 offsets - storage: storage, + checkpointInterval: 1, // Checkpoint every offset for immediate persistence + storage: storage, } - + // Recover offset state if err := manager.recover(); err != nil { return nil, fmt.Errorf("failed to recover offset state: %w", err) } - + return manager, nil } // AssignOffset assigns the next sequential offset func (m *PartitionOffsetManager) AssignOffset() int64 { + var shouldCheckpoint bool + var checkpointOffset int64 + m.mu.Lock() - defer m.mu.Unlock() - offset := m.nextOffset m.nextOffset++ - - // Checkpoint periodically + + // Check if we should checkpoint (but don't do it inside the lock) if offset-m.lastCheckpoint >= m.checkpointInterval { - go m.checkpoint(offset) + shouldCheckpoint = true + checkpointOffset = offset } - + m.mu.Unlock() + + // Checkpoint outside the lock to avoid deadlock + if shouldCheckpoint { + m.checkpoint(checkpointOffset) + } + return offset } // AssignOffsets assigns a batch of sequential offsets func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) { + var shouldCheckpoint bool + var checkpointOffset int64 + m.mu.Lock() - defer m.mu.Unlock() - baseOffset = m.nextOffset lastOffset = m.nextOffset + count - 1 m.nextOffset += count - - // Checkpoint if needed + + // Check if we should checkpoint (but don't do it inside the lock) if lastOffset-m.lastCheckpoint >= m.checkpointInterval { - go m.checkpoint(lastOffset) + shouldCheckpoint = true + checkpointOffset = lastOffset + } + m.mu.Unlock() + + // Checkpoint outside the lock to avoid deadlock + if shouldCheckpoint { + m.checkpoint(checkpointOffset) } - + return baseOffset, lastOffset } @@ -97,17 +113,17 @@ func (m *PartitionOffsetManager) GetHighWaterMark() int64 { func (m *PartitionOffsetManager) recover() error { var checkpointOffset int64 = -1 var highestOffset int64 = -1 - + // Try to load checkpoint if offset, err := m.storage.LoadCheckpoint(m.partition); err == nil && offset >= 0 { checkpointOffset = offset } - + // Try to scan storage for highest offset if offset, err := m.storage.GetHighestOffset(m.partition); err == nil && offset >= 0 { highestOffset = offset } - + // Use the higher of checkpoint or storage scan if checkpointOffset >= 0 && highestOffset >= 0 { if highestOffset > checkpointOffset { @@ -128,7 +144,7 @@ func (m *PartitionOffsetManager) recover() error { m.nextOffset = 0 m.lastCheckpoint = -1 } - + return nil } @@ -139,7 +155,7 @@ func (m *PartitionOffsetManager) checkpoint(offset int64) { fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err) return } - + m.mu.Lock() m.lastCheckpoint = offset m.mu.Unlock() @@ -163,29 +179,29 @@ func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry // GetManager returns the offset manager for a partition, creating it if needed func (r *PartitionOffsetRegistry) GetManager(partition *schema_pb.Partition) (*PartitionOffsetManager, error) { key := partitionKey(partition) - + r.mu.RLock() manager, exists := r.managers[key] r.mu.RUnlock() - + if exists { return manager, nil } - + // Create new manager r.mu.Lock() defer r.mu.Unlock() - + // Double-check after acquiring write lock if manager, exists := r.managers[key]; exists { return manager, nil } - + manager, err := NewPartitionOffsetManager(partition, r.storage) if err != nil { return nil, err } - + r.managers[key] = manager return manager, nil } @@ -196,7 +212,7 @@ func (r *PartitionOffsetRegistry) AssignOffset(partition *schema_pb.Partition) ( if err != nil { return 0, err } - + return manager.AssignOffset(), nil } @@ -206,7 +222,7 @@ func (r *PartitionOffsetRegistry) AssignOffsets(partition *schema_pb.Partition, if err != nil { return 0, 0, err } - + baseOffset, lastOffset = manager.AssignOffsets(count) return baseOffset, lastOffset, nil } @@ -217,13 +233,13 @@ func (r *PartitionOffsetRegistry) GetHighWaterMark(partition *schema_pb.Partitio if err != nil { return 0, err } - + return manager.GetHighWaterMark(), nil } // partitionKey generates a unique key for a partition func partitionKey(partition *schema_pb.Partition) string { - return fmt.Sprintf("ring:%d:range:%d-%d:time:%d", + return fmt.Sprintf("ring:%d:range:%d-%d:time:%d", partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs) } @@ -268,7 +284,7 @@ func (a *OffsetAssigner) AssignSingleOffset(partition *schema_pb.Partition) *Ass if err != nil { return &AssignmentResult{Error: err} } - + return &AssignmentResult{ Assignment: &OffsetAssignment{ Offset: offset, @@ -284,7 +300,7 @@ func (a *OffsetAssigner) AssignBatchOffsets(partition *schema_pb.Partition, coun if err != nil { return &AssignmentResult{Error: err} } - + return &AssignmentResult{ Batch: &BatchOffsetAssignment{ BaseOffset: baseOffset,