Browse Source

fix: Correct Kafka error codes - UNKNOWN_SERVER_ERROR = -1, OFFSET_OUT_OF_RANGE = 1

Phase 13: CRITICAL BUG FIX - Error Code Mismatch

Problem:
  Producer CreateTopic calls were failing with confusing error:
    'kafka server: The requested offset is outside the range of offsets...'
  But the real error was topic creation failure!

Root Cause:
  SeaweedFS had WRONG error code mappings:
    ErrorCodeUnknownServerError = 1  ← WRONG!
    ErrorCodeOffsetOutOfRange = 2    ← WRONG!

  Official Kafka protocol:
    -1 = UNKNOWN_SERVER_ERROR
     1 = OFFSET_OUT_OF_RANGE

  When CreateTopics handler returned errCode=1 for topic creation failure,
  Sarama client interpreted it as OFFSET_OUT_OF_RANGE, causing massive confusion!

The Flow:
  1. Producer tries to create loadtest-topic-2
  2. CreateTopics handler fails (schema fetch error), returns errCode=1
  3. Sarama interprets errCode=1 as OFFSET_OUT_OF_RANGE (not UNKNOWN_SERVER_ERROR!)
  4. Producer logs: 'The requested offset is outside the range...'
  5. Producer continues anyway (only warns on non-TOPIC_ALREADY_EXISTS errors)
  6. Consumer tries to consume from non-existent topic-2
  7. Gets 'topic does not exist' → rebalances → starts from offset 0 → DUPLICATES!

Fix:
  1. Corrected error code constants:
     ErrorCodeUnknownServerError = -1 (was 1)
     ErrorCodeOffsetOutOfRange = 1 (was 2)
  2. Updated all error handlers to use 0xFFFF (uint16 representation of -1)
  3. Now topic creation failures return proper UNKNOWN_SERVER_ERROR

Expected Result:
  - CreateTopic failures will be properly reported
  - Producers will see correct error messages
  - No more confusing OFFSET_OUT_OF_RANGE errors during topic creation
  - Should eliminate topic persistence race causing duplicates
pull/7329/head
chrislu 2 months ago
parent
commit
23316d4d13
  1. 4
      weed/mq/kafka/protocol/errors.go
  2. 8
      weed/mq/kafka/protocol/handler.go
  3. 6
      weed/mq/kafka/protocol/produce.go

4
weed/mq/kafka/protocol/errors.go

@ -14,8 +14,8 @@ const (
ErrorCodeNone int16 = 0 ErrorCodeNone int16 = 0
// General server errors // General server errors
ErrorCodeUnknownServerError int16 = 1
ErrorCodeOffsetOutOfRange int16 = 2
ErrorCodeUnknownServerError int16 = -1
ErrorCodeOffsetOutOfRange int16 = 1
ErrorCodeCorruptMessage int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION ErrorCodeCorruptMessage int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION
ErrorCodeUnknownTopicOrPartition int16 = 3 ErrorCodeUnknownTopicOrPartition int16 = 3
ErrorCodeInvalidFetchSize int16 = 4 ErrorCodeInvalidFetchSize int16 = 4

8
weed/mq/kafka/protocol/handler.go

@ -2342,7 +2342,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by
} else { } else {
// Use schema-aware topic creation // Use schema-aware topic creation
if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil { if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil {
errCode = 1 // UNKNOWN_SERVER_ERROR
errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
} }
} }
eb := make([]byte, 2) eb := make([]byte, 2)
@ -2472,7 +2472,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt
} else { } else {
// Create the topic in SeaweedMQ with schema support // Create the topic in SeaweedMQ with schema support
if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil { if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
} }
} }
@ -2747,7 +2747,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
} else { } else {
// Use corrected values for error checking and topic creation with schema support // Use corrected values for error checking and topic creation with schema support
if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil { if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil {
errCode = 1 // UNKNOWN_SERVER_ERROR
errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
} }
} }
eb := make([]byte, 2) eb := make([]byte, 2)
@ -2879,7 +2879,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
} else { } else {
// Delete the topic from SeaweedMQ // Delete the topic from SeaweedMQ
if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
errorMessage = err.Error() errorMessage = err.Error()
} }
} }

6
weed/mq/kafka/protocol/produce.go

@ -159,7 +159,7 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a
if h.isSchemaValidationError(err) { if h.isSchemaValidationError(err) {
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
} }
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
} else { } else {
baseOffset = offset baseOffset = offset
} }
@ -732,7 +732,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if h.isSchemaValidationError(prodErr) { if h.isSchemaValidationError(prodErr) {
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
} }
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
break break
} }
@ -751,7 +751,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
for idx, kv := range records { for idx, kv := range records {
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil { if prodErr != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
break break
} }
if idx == 0 { if idx == 0 {

Loading…
Cancel
Save