From 23316d4d13e6922d3a3dc7e537b8c5b2a6b434c6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 15:43:38 -0700 Subject: [PATCH] fix: Correct Kafka error codes - UNKNOWN_SERVER_ERROR = -1, OFFSET_OUT_OF_RANGE = 1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 13: CRITICAL BUG FIX - Error Code Mismatch Problem: Producer CreateTopic calls were failing with confusing error: 'kafka server: The requested offset is outside the range of offsets...' But the real error was topic creation failure! Root Cause: SeaweedFS had WRONG error code mappings: ErrorCodeUnknownServerError = 1 ← WRONG! ErrorCodeOffsetOutOfRange = 2 ← WRONG! Official Kafka protocol: -1 = UNKNOWN_SERVER_ERROR 1 = OFFSET_OUT_OF_RANGE When CreateTopics handler returned errCode=1 for topic creation failure, Sarama client interpreted it as OFFSET_OUT_OF_RANGE, causing massive confusion! The Flow: 1. Producer tries to create loadtest-topic-2 2. CreateTopics handler fails (schema fetch error), returns errCode=1 3. Sarama interprets errCode=1 as OFFSET_OUT_OF_RANGE (not UNKNOWN_SERVER_ERROR!) 4. Producer logs: 'The requested offset is outside the range...' 5. Producer continues anyway (only warns on non-TOPIC_ALREADY_EXISTS errors) 6. Consumer tries to consume from non-existent topic-2 7. Gets 'topic does not exist' → rebalances → starts from offset 0 → DUPLICATES! Fix: 1. Corrected error code constants: ErrorCodeUnknownServerError = -1 (was 1) ErrorCodeOffsetOutOfRange = 1 (was 2) 2. Updated all error handlers to use 0xFFFF (uint16 representation of -1) 3. Now topic creation failures return proper UNKNOWN_SERVER_ERROR Expected Result: - CreateTopic failures will be properly reported - Producers will see correct error messages - No more confusing OFFSET_OUT_OF_RANGE errors during topic creation - Should eliminate topic persistence race causing duplicates --- weed/mq/kafka/protocol/errors.go | 4 ++-- weed/mq/kafka/protocol/handler.go | 8 ++++---- weed/mq/kafka/protocol/produce.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/weed/mq/kafka/protocol/errors.go b/weed/mq/kafka/protocol/errors.go index 116122f0a..93bc85c80 100644 --- a/weed/mq/kafka/protocol/errors.go +++ b/weed/mq/kafka/protocol/errors.go @@ -14,8 +14,8 @@ const ( ErrorCodeNone int16 = 0 // General server errors - ErrorCodeUnknownServerError int16 = 1 - ErrorCodeOffsetOutOfRange int16 = 2 + ErrorCodeUnknownServerError int16 = -1 + ErrorCodeOffsetOutOfRange int16 = 1 ErrorCodeCorruptMessage int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION ErrorCodeUnknownTopicOrPartition int16 = 3 ErrorCodeInvalidFetchSize int16 = 4 diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 8aeb3ad96..2768793d2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -2342,7 +2342,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by } else { // Use schema-aware topic creation if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2472,7 +2472,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt } else { // Create the topic in SeaweedMQ with schema support if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } @@ -2747,7 +2747,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint } else { // Use corrected values for error checking and topic creation with schema support if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2879,7 +2879,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( } else { // Delete the topic from SeaweedMQ if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) errorMessage = err.Error() } } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 73cdb46cb..0c806f807 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -159,7 +159,7 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a if h.isSchemaValidationError(err) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures } - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } else { baseOffset = offset } @@ -732,7 +732,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if h.isSchemaValidationError(prodErr) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures } - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) break } @@ -751,7 +751,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, for idx, kv := range records { offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) break } if idx == 0 {