Browse Source

purge unused

pull/7329/head
chrislu 7 days ago
parent
commit
0cbc5e906e
  1. 57
      weed/mq/kafka/protocol/consumer_group_metadata.go
  2. 12
      weed/mq/kafka/protocol/errors.go
  3. 62
      weed/mq/kafka/protocol/handler.go
  4. 69
      weed/mq/kafka/protocol/logging.go

57
weed/mq/kafka/protocol/consumer_group_metadata.go

@ -4,7 +4,6 @@ import (
"encoding/binary"
"fmt"
"net"
"strings"
"sync"
)
@ -146,42 +145,6 @@ func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*Consu
return result, nil
}
// GenerateConsumerProtocolMetadata creates protocol metadata for a consumer subscription
func GenerateConsumerProtocolMetadata(topics []string, userData []byte) []byte {
// Calculate total size needed
size := 2 + 4 + 4 // version + topics_count + user_data_length
for _, topic := range topics {
size += 2 + len(topic) // topic_name_length + topic_name
}
size += len(userData)
metadata := make([]byte, 0, size)
// Version (2 bytes) - use version 1
metadata = append(metadata, 0, 1)
// Topics count (4 bytes)
topicsCount := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCount, uint32(len(topics)))
metadata = append(metadata, topicsCount...)
// Topics (string array)
for _, topic := range topics {
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
metadata = append(metadata, topicLen...)
metadata = append(metadata, []byte(topic)...)
}
// UserData length and data (4 bytes + data)
userDataLen := make([]byte, 4)
binary.BigEndian.PutUint32(userDataLen, uint32(len(userData)))
metadata = append(metadata, userDataLen...)
metadata = append(metadata, userData...)
return metadata
}
// ValidateAssignmentStrategy checks if an assignment strategy is supported
func ValidateAssignmentStrategy(strategy string) bool {
supportedStrategies := map[string]bool{
@ -273,26 +236,6 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri
return "range"
}
// SanitizeConsumerGroupID validates and sanitizes consumer group ID
func SanitizeConsumerGroupID(groupID string) (string, error) {
if len(groupID) == 0 {
return "", fmt.Errorf("empty group ID")
}
if len(groupID) > 255 {
return "", fmt.Errorf("group ID too long: %d characters (max 255)", len(groupID))
}
// Basic validation: no control characters
for _, char := range groupID {
if char < 32 || char == 127 {
return "", fmt.Errorf("group ID contains invalid characters")
}
}
return strings.TrimSpace(groupID), nil
}
// ProtocolMetadataDebugInfo returns debug information about protocol metadata
type ProtocolMetadataDebugInfo struct {
Strategy string

12
weed/mq/kafka/protocol/errors.go

@ -3,7 +3,6 @@ package protocol
import (
"context"
"encoding/binary"
"fmt"
"net"
"time"
)
@ -361,14 +360,3 @@ func HandleTimeoutError(err error, operation string) int16 {
return ClassifyNetworkError(err)
}
// SafeFormatError safely formats error messages to avoid information leakage
func SafeFormatError(err error) string {
if err == nil {
return ""
}
// For production, we might want to sanitize error messages
// For now, return the full error for debugging
return fmt.Sprintf("Error: %v", err)
}

62
weed/mq/kafka/protocol/handler.go

@ -206,11 +206,6 @@ func (h *Handler) getTopicSchemaFormat(topic string) string {
return "" // Empty string means schemaless or format unknown
}
// stringPtr returns a pointer to the given string
func stringPtr(s string) *string {
return &s
}
// Handler processes Kafka protocol requests from clients using SeaweedMQ
type Handler struct {
// SeaweedMQ integration
@ -365,7 +360,7 @@ func (h *Handler) Close() error {
// Close broker client if present
if h.brokerClient != nil {
if err := h.brokerClient.Close(); err != nil {
Warning("Failed to close broker client: %v", err)
glog.Warningf("Failed to close broker client: %v", err)
}
}
@ -376,17 +371,6 @@ func (h *Handler) Close() error {
return nil
}
// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
// Record batch storage is now handled by the SeaweedMQ handler
}
// GetRecordBatch retrieves a stored record batch that contains the requested offset
func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) {
// Record batch retrieval is now handled by the SeaweedMQ handler
return nil, false
}
// SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses
func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) {
h.smqBrokerAddresses = brokerAddresses
@ -519,7 +503,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Close the per-connection broker client
if connBrokerClient != nil {
if closeErr := connBrokerClient.Close(); closeErr != nil {
Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr)
glog.Errorf("[%s] Error closing BrokerClient: %v", connectionID, closeErr)
}
}
// Remove connection context from map
@ -591,12 +575,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Send this response
if readyResp.err != nil {
Error("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err)
glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err)
} else {
glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response))
if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil {
glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr)
Error("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr)
glog.Errorf("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr)
correlationQueueMu.Unlock()
return
}
@ -1112,7 +1096,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody)
default:
Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID)
glog.Warningf("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID)
err = fmt.Errorf("unsupported API key: %d (version %d)", req.apiKey, req.apiVersion)
}
@ -2883,7 +2867,7 @@ func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16,
// Parse request to extract resources
resources, err := h.parseDescribeConfigsRequest(requestBody, apiVersion)
if err != nil {
Error("DescribeConfigs parsing error: %v", err)
glog.Errorf("DescribeConfigs parsing error: %v", err)
return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err)
}
@ -3122,40 +3106,6 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32,
return nil
}
// hexDump formats bytes as a hex dump with ASCII representation
func hexDump(data []byte) string {
var result strings.Builder
for i := 0; i < len(data); i += 16 {
// Offset
result.WriteString(fmt.Sprintf("%04x ", i))
// Hex bytes
for j := 0; j < 16; j++ {
if i+j < len(data) {
result.WriteString(fmt.Sprintf("%02x ", data[i+j]))
} else {
result.WriteString(" ")
}
if j == 7 {
result.WriteString(" ")
}
}
// ASCII representation
result.WriteString(" |")
for j := 0; j < 16 && i+j < len(data); j++ {
b := data[i+j]
if b >= 32 && b < 127 {
result.WriteByte(b)
} else {
result.WriteByte('.')
}
}
result.WriteString("|\n")
}
return result.String()
}
// writeResponseWithCorrelationID is deprecated - use writeResponseWithHeader instead
// Kept for compatibility with direct callers that don't have API info
func (h *Handler) writeResponseWithCorrelationID(w *bufio.Writer, correlationID uint32, responseBody []byte, timeout time.Duration) error {

69
weed/mq/kafka/protocol/logging.go

@ -1,69 +0,0 @@
package protocol
import (
"log"
"os"
)
// Logger provides structured logging for Kafka protocol operations
type Logger struct {
debug *log.Logger
info *log.Logger
warning *log.Logger
error *log.Logger
}
// NewLogger creates a new logger instance
func NewLogger() *Logger {
return &Logger{
debug: log.New(os.Stdout, "[KAFKA-DEBUG] ", log.LstdFlags|log.Lshortfile),
info: log.New(os.Stdout, "[KAFKA-INFO] ", log.LstdFlags),
warning: log.New(os.Stdout, "[KAFKA-WARN] ", log.LstdFlags),
error: log.New(os.Stderr, "[KAFKA-ERROR] ", log.LstdFlags|log.Lshortfile),
}
}
// Debug logs debug messages (only in debug mode)
func (l *Logger) Debug(format string, args ...interface{}) {
if os.Getenv("KAFKA_DEBUG") != "" {
l.debug.Printf(format, args...)
}
}
// Info logs informational messages
func (l *Logger) Info(format string, args ...interface{}) {
l.info.Printf(format, args...)
}
// Warning logs warning messages
func (l *Logger) Warning(format string, args ...interface{}) {
l.warning.Printf(format, args...)
}
// Error logs error messages
func (l *Logger) Error(format string, args ...interface{}) {
l.error.Printf(format, args...)
}
// Global logger instance
var logger = NewLogger()
// Debug logs debug messages using the global logger
func Debug(format string, args ...interface{}) {
logger.Debug(format, args...)
}
// Info logs informational messages using the global logger
func Info(format string, args ...interface{}) {
logger.Info(format, args...)
}
// Warning logs warning messages using the global logger
func Warning(format string, args ...interface{}) {
logger.Warning(format, args...)
}
// Error logs error messages using the global logger
func Error(format string, args ...interface{}) {
logger.Error(format, args...)
}
Loading…
Cancel
Save