diff --git a/weed/mq/kafka/protocol/errors.go b/weed/mq/kafka/protocol/errors.go index 116122f0a..93bc85c80 100644 --- a/weed/mq/kafka/protocol/errors.go +++ b/weed/mq/kafka/protocol/errors.go @@ -14,8 +14,8 @@ const ( ErrorCodeNone int16 = 0 // General server errors - ErrorCodeUnknownServerError int16 = 1 - ErrorCodeOffsetOutOfRange int16 = 2 + ErrorCodeUnknownServerError int16 = -1 + ErrorCodeOffsetOutOfRange int16 = 1 ErrorCodeCorruptMessage int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION ErrorCodeUnknownTopicOrPartition int16 = 3 ErrorCodeInvalidFetchSize int16 = 4 diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 8aeb3ad96..2768793d2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -2342,7 +2342,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by } else { // Use schema-aware topic creation if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2472,7 +2472,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt } else { // Create the topic in SeaweedMQ with schema support if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } @@ -2747,7 +2747,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint } else { // Use corrected values for error checking and topic creation with schema support if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2879,7 +2879,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( } else { // Delete the topic from SeaweedMQ if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) errorMessage = err.Error() } } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 73cdb46cb..0c806f807 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -159,7 +159,7 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a if h.isSchemaValidationError(err) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures } - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } else { baseOffset = offset } @@ -732,7 +732,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if h.isSchemaValidationError(prodErr) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures } - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) break } @@ -751,7 +751,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, for idx, kv := range records { offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) break } if idx == 0 {