diff --git a/weed/mq/kafka/protocol/consumer_group_metadata.go b/weed/mq/kafka/protocol/consumer_group_metadata.go index f0c20a312..dd84e6675 100644 --- a/weed/mq/kafka/protocol/consumer_group_metadata.go +++ b/weed/mq/kafka/protocol/consumer_group_metadata.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "net" - "strings" "sync" ) @@ -146,42 +145,6 @@ func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*Consu return result, nil } -// GenerateConsumerProtocolMetadata creates protocol metadata for a consumer subscription -func GenerateConsumerProtocolMetadata(topics []string, userData []byte) []byte { - // Calculate total size needed - size := 2 + 4 + 4 // version + topics_count + user_data_length - for _, topic := range topics { - size += 2 + len(topic) // topic_name_length + topic_name - } - size += len(userData) - - metadata := make([]byte, 0, size) - - // Version (2 bytes) - use version 1 - metadata = append(metadata, 0, 1) - - // Topics count (4 bytes) - topicsCount := make([]byte, 4) - binary.BigEndian.PutUint32(topicsCount, uint32(len(topics))) - metadata = append(metadata, topicsCount...) - - // Topics (string array) - for _, topic := range topics { - topicLen := make([]byte, 2) - binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) - metadata = append(metadata, topicLen...) - metadata = append(metadata, []byte(topic)...) - } - - // UserData length and data (4 bytes + data) - userDataLen := make([]byte, 4) - binary.BigEndian.PutUint32(userDataLen, uint32(len(userData))) - metadata = append(metadata, userDataLen...) - metadata = append(metadata, userData...) - - return metadata -} - // ValidateAssignmentStrategy checks if an assignment strategy is supported func ValidateAssignmentStrategy(strategy string) bool { supportedStrategies := map[string]bool{ @@ -273,26 +236,6 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri return "range" } -// SanitizeConsumerGroupID validates and sanitizes consumer group ID -func SanitizeConsumerGroupID(groupID string) (string, error) { - if len(groupID) == 0 { - return "", fmt.Errorf("empty group ID") - } - - if len(groupID) > 255 { - return "", fmt.Errorf("group ID too long: %d characters (max 255)", len(groupID)) - } - - // Basic validation: no control characters - for _, char := range groupID { - if char < 32 || char == 127 { - return "", fmt.Errorf("group ID contains invalid characters") - } - } - - return strings.TrimSpace(groupID), nil -} - // ProtocolMetadataDebugInfo returns debug information about protocol metadata type ProtocolMetadataDebugInfo struct { Strategy string diff --git a/weed/mq/kafka/protocol/errors.go b/weed/mq/kafka/protocol/errors.go index df8f11630..116122f0a 100644 --- a/weed/mq/kafka/protocol/errors.go +++ b/weed/mq/kafka/protocol/errors.go @@ -3,7 +3,6 @@ package protocol import ( "context" "encoding/binary" - "fmt" "net" "time" ) @@ -361,14 +360,3 @@ func HandleTimeoutError(err error, operation string) int16 { return ClassifyNetworkError(err) } - -// SafeFormatError safely formats error messages to avoid information leakage -func SafeFormatError(err error) string { - if err == nil { - return "" - } - - // For production, we might want to sanitize error messages - // For now, return the full error for debugging - return fmt.Sprintf("Error: %v", err) -} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 4cf1e1e8c..4dd94baf6 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -206,11 +206,6 @@ func (h *Handler) getTopicSchemaFormat(topic string) string { return "" // Empty string means schemaless or format unknown } -// stringPtr returns a pointer to the given string -func stringPtr(s string) *string { - return &s -} - // Handler processes Kafka protocol requests from clients using SeaweedMQ type Handler struct { // SeaweedMQ integration @@ -365,7 +360,7 @@ func (h *Handler) Close() error { // Close broker client if present if h.brokerClient != nil { if err := h.brokerClient.Close(); err != nil { - Warning("Failed to close broker client: %v", err) + glog.Warningf("Failed to close broker client: %v", err) } } @@ -376,17 +371,6 @@ func (h *Handler) Close() error { return nil } -// StoreRecordBatch stores a record batch for later retrieval during Fetch operations -func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { - // Record batch storage is now handled by the SeaweedMQ handler -} - -// GetRecordBatch retrieves a stored record batch that contains the requested offset -func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { - // Record batch retrieval is now handled by the SeaweedMQ handler - return nil, false -} - // SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) { h.smqBrokerAddresses = brokerAddresses @@ -519,7 +503,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Close the per-connection broker client if connBrokerClient != nil { if closeErr := connBrokerClient.Close(); closeErr != nil { - Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr) + glog.Errorf("[%s] Error closing BrokerClient: %v", connectionID, closeErr) } } // Remove connection context from map @@ -591,12 +575,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Send this response if readyResp.err != nil { - Error("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) + glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) } else { glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil { glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) - Error("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) + glog.Errorf("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) correlationQueueMu.Unlock() return } @@ -1112,7 +1096,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody) default: - Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID) + glog.Warningf("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID) err = fmt.Errorf("unsupported API key: %d (version %d)", req.apiKey, req.apiVersion) } @@ -2883,7 +2867,7 @@ func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16, // Parse request to extract resources resources, err := h.parseDescribeConfigsRequest(requestBody, apiVersion) if err != nil { - Error("DescribeConfigs parsing error: %v", err) + glog.Errorf("DescribeConfigs parsing error: %v", err) return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err) } @@ -3122,40 +3106,6 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32, return nil } -// hexDump formats bytes as a hex dump with ASCII representation -func hexDump(data []byte) string { - var result strings.Builder - for i := 0; i < len(data); i += 16 { - // Offset - result.WriteString(fmt.Sprintf("%04x ", i)) - - // Hex bytes - for j := 0; j < 16; j++ { - if i+j < len(data) { - result.WriteString(fmt.Sprintf("%02x ", data[i+j])) - } else { - result.WriteString(" ") - } - if j == 7 { - result.WriteString(" ") - } - } - - // ASCII representation - result.WriteString(" |") - for j := 0; j < 16 && i+j < len(data); j++ { - b := data[i+j] - if b >= 32 && b < 127 { - result.WriteByte(b) - } else { - result.WriteByte('.') - } - } - result.WriteString("|\n") - } - return result.String() -} - // writeResponseWithCorrelationID is deprecated - use writeResponseWithHeader instead // Kept for compatibility with direct callers that don't have API info func (h *Handler) writeResponseWithCorrelationID(w *bufio.Writer, correlationID uint32, responseBody []byte, timeout time.Duration) error { diff --git a/weed/mq/kafka/protocol/logging.go b/weed/mq/kafka/protocol/logging.go deleted file mode 100644 index ccc4579be..000000000 --- a/weed/mq/kafka/protocol/logging.go +++ /dev/null @@ -1,69 +0,0 @@ -package protocol - -import ( - "log" - "os" -) - -// Logger provides structured logging for Kafka protocol operations -type Logger struct { - debug *log.Logger - info *log.Logger - warning *log.Logger - error *log.Logger -} - -// NewLogger creates a new logger instance -func NewLogger() *Logger { - return &Logger{ - debug: log.New(os.Stdout, "[KAFKA-DEBUG] ", log.LstdFlags|log.Lshortfile), - info: log.New(os.Stdout, "[KAFKA-INFO] ", log.LstdFlags), - warning: log.New(os.Stdout, "[KAFKA-WARN] ", log.LstdFlags), - error: log.New(os.Stderr, "[KAFKA-ERROR] ", log.LstdFlags|log.Lshortfile), - } -} - -// Debug logs debug messages (only in debug mode) -func (l *Logger) Debug(format string, args ...interface{}) { - if os.Getenv("KAFKA_DEBUG") != "" { - l.debug.Printf(format, args...) - } -} - -// Info logs informational messages -func (l *Logger) Info(format string, args ...interface{}) { - l.info.Printf(format, args...) -} - -// Warning logs warning messages -func (l *Logger) Warning(format string, args ...interface{}) { - l.warning.Printf(format, args...) -} - -// Error logs error messages -func (l *Logger) Error(format string, args ...interface{}) { - l.error.Printf(format, args...) -} - -// Global logger instance -var logger = NewLogger() - -// Debug logs debug messages using the global logger -func Debug(format string, args ...interface{}) { - logger.Debug(format, args...) -} - -// Info logs informational messages using the global logger -func Info(format string, args ...interface{}) { - logger.Info(format, args...) -} - -// Warning logs warning messages using the global logger -func Warning(format string, args ...interface{}) { - logger.Warning(format, args...) -} - -// Error logs error messages using the global logger -func Error(format string, args ...interface{}) { - logger.Error(format, args...) -}