package protocol import ( "bufio" "bytes" "context" "encoding/binary" "fmt" "io" "net" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" ) // TopicInfo holds basic information about a topic type TopicInfo struct { Name string Partitions int32 CreatedAt int64 } // TopicPartitionKey uniquely identifies a topic partition type TopicPartitionKey struct { Topic string Partition int32 } // SeaweedMQHandlerInterface defines the interface for SeaweedMQ integration type SeaweedMQHandlerInterface interface { TopicExists(topic string) bool ListTopics() []string CreateTopic(topic string, partitions int32) error DeleteTopic(topic string) error GetOrCreateLedger(topic string, partition int32) *offset.Ledger GetLedger(topic string, partition int32) *offset.Ledger ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) Close() error } // Handler processes Kafka protocol requests from clients using SeaweedMQ type Handler struct { // SeaweedMQ integration seaweedMQHandler SeaweedMQHandlerInterface // SMQ offset storage for consumer group offsets smqOffsetStorage *offset.SMQOffsetStorage // Consumer group coordination groupCoordinator *consumer.GroupCoordinator // Schema management (optional, for schematized topics) schemaManager *schema.Manager useSchema bool brokerClient *schema.BrokerClient // Dynamic broker address for Metadata responses brokerHost string brokerPort int } // NewHandler creates a basic Kafka handler with in-memory storage // For production use with persistent storage, use NewSeaweedMQBrokerHandler instead func NewHandler() *Handler { return &Handler{ groupCoordinator: consumer.NewGroupCoordinator(), brokerHost: "localhost", brokerPort: 9092, seaweedMQHandler: &basicSeaweedMQHandler{ topics: make(map[string]bool), ledgers: make(map[string]*offset.Ledger), messages: make(map[string]map[int32]map[int64]*MessageRecord), }, } } // NewTestHandler creates a handler for testing purposes without requiring SeaweedMQ masters // This should ONLY be used in tests func NewTestHandler() *Handler { return &Handler{ groupCoordinator: consumer.NewGroupCoordinator(), brokerHost: "localhost", brokerPort: 9092, seaweedMQHandler: &testSeaweedMQHandler{ topics: make(map[string]bool), ledgers: make(map[string]*offset.Ledger), }, } } // MessageRecord represents a stored message type MessageRecord struct { Key []byte Value []byte Timestamp int64 } // basicSeaweedMQHandler is a minimal in-memory implementation for basic Kafka functionality type basicSeaweedMQHandler struct { topics map[string]bool ledgers map[string]*offset.Ledger // messages stores actual message content indexed by topic-partition-offset messages map[string]map[int32]map[int64]*MessageRecord // topic -> partition -> offset -> message mu sync.RWMutex } // testSeaweedMQHandler is a minimal mock implementation for testing type testSeaweedMQHandler struct { topics map[string]bool ledgers map[string]*offset.Ledger mu sync.RWMutex } // basicSeaweedMQHandler implementation func (b *basicSeaweedMQHandler) TopicExists(topic string) bool { return b.topics[topic] } func (b *basicSeaweedMQHandler) ListTopics() []string { topics := make([]string, 0, len(b.topics)) for topic := range b.topics { topics = append(topics, topic) } return topics } func (b *basicSeaweedMQHandler) CreateTopic(topic string, partitions int32) error { b.topics[topic] = true return nil } func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error { delete(b.topics, topic) return nil } func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { b.mu.Lock() defer b.mu.Unlock() key := fmt.Sprintf("%s-%d", topic, partition) if ledger, exists := b.ledgers[key]; exists { return ledger } // Create new ledger ledger := offset.NewLedger() b.ledgers[key] = ledger // Also create the topic if it doesn't exist b.topics[topic] = true return ledger } func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger { b.mu.RLock() defer b.mu.RUnlock() key := fmt.Sprintf("%s-%d", topic, partition) if ledger, exists := b.ledgers[key]; exists { return ledger } // Return nil if ledger doesn't exist (topic doesn't exist) return nil } func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { // Get or create the ledger first (this will acquire and release the lock) ledger := b.GetOrCreateLedger(topicName, partitionID) // Now acquire the lock for the rest of the operation b.mu.Lock() defer b.mu.Unlock() // Assign an offset and append the record offset := ledger.AssignOffsets(1) timestamp := time.Now().UnixNano() size := int32(len(value)) if err := ledger.AppendRecord(offset, timestamp, size); err != nil { return 0, fmt.Errorf("failed to append record: %w", err) } // Store the actual message content if b.messages[topicName] == nil { b.messages[topicName] = make(map[int32]map[int64]*MessageRecord) } if b.messages[topicName][partitionID] == nil { b.messages[topicName][partitionID] = make(map[int64]*MessageRecord) } // Make copies of key and value to avoid referencing the original slices keyCopy := make([]byte, len(key)) copy(keyCopy, key) valueCopy := make([]byte, len(value)) copy(valueCopy, value) b.messages[topicName][partitionID][offset] = &MessageRecord{ Key: keyCopy, Value: valueCopy, Timestamp: timestamp, } return offset, nil } // GetStoredMessages retrieves stored messages for a topic-partition from a given offset func (b *basicSeaweedMQHandler) GetStoredMessages(topicName string, partitionID int32, fromOffset int64, maxMessages int) []*MessageRecord { b.mu.RLock() defer b.mu.RUnlock() if b.messages[topicName] == nil || b.messages[topicName][partitionID] == nil { return nil } partitionMessages := b.messages[topicName][partitionID] var result []*MessageRecord // Collect messages starting from fromOffset for offset := fromOffset; offset < fromOffset+int64(maxMessages); offset++ { if msg, exists := partitionMessages[offset]; exists { result = append(result, msg) } else { // No more consecutive messages break } } return result } func (b *basicSeaweedMQHandler) Close() error { return nil } // testSeaweedMQHandler implementation (for tests) func (t *testSeaweedMQHandler) TopicExists(topic string) bool { return t.topics[topic] } func (t *testSeaweedMQHandler) ListTopics() []string { var topics []string for topic := range t.topics { topics = append(topics, topic) } return topics } func (t *testSeaweedMQHandler) CreateTopic(topic string, partitions int32) error { t.topics[topic] = true return nil } func (t *testSeaweedMQHandler) DeleteTopic(topic string) error { delete(t.topics, topic) return nil } func (t *testSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { t.mu.Lock() defer t.mu.Unlock() // Mark topic as existing when creating ledger t.topics[topic] = true key := fmt.Sprintf("%s-%d", topic, partition) if ledger, exists := t.ledgers[key]; exists { return ledger } ledger := offset.NewLedger() t.ledgers[key] = ledger return ledger } func (t *testSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger { t.mu.RLock() defer t.mu.RUnlock() key := fmt.Sprintf("%s-%d", topic, partition) if ledger, exists := t.ledgers[key]; exists { return ledger } // Return nil if ledger doesn't exist (topic doesn't exist) return nil } func (t *testSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { // For testing, actually store the record in the ledger ledger := t.GetOrCreateLedger(topicName, partitionID) // Assign an offset and append the record offset := ledger.AssignOffsets(1) timestamp := time.Now().UnixNano() size := int32(len(value)) if err := ledger.AppendRecord(offset, timestamp, size); err != nil { return 0, fmt.Errorf("failed to append record: %w", err) } return offset, nil } func (t *testSeaweedMQHandler) Close() error { return nil } // AddTopicForTesting creates a topic for testing purposes (restored for test compatibility) func (h *Handler) AddTopicForTesting(topicName string, partitions int32) { if h.seaweedMQHandler != nil { h.seaweedMQHandler.CreateTopic(topicName, partitions) } } // NewSeaweedMQHandler creates a new handler with SeaweedMQ integration func NewSeaweedMQHandler(agentAddress string) (*Handler, error) { smqHandler, err := integration.NewSeaweedMQHandler(agentAddress) if err != nil { return nil, err } return &Handler{ seaweedMQHandler: smqHandler, groupCoordinator: consumer.NewGroupCoordinator(), brokerHost: "localhost", brokerPort: 9092, }, nil } // NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, error) { // Set up SeaweedMQ integration smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup) if err != nil { return nil, err } // Create SMQ offset storage using the first master as filer address masterAddresses := strings.Split(masters, ",") filerAddress := masterAddresses[0] // Use first master as filer smqOffsetStorage, err := offset.NewSMQOffsetStorage(filerAddress) if err != nil { return nil, fmt.Errorf("failed to create SMQ offset storage: %w", err) } return &Handler{ seaweedMQHandler: smqHandler, smqOffsetStorage: smqOffsetStorage, groupCoordinator: consumer.NewGroupCoordinator(), brokerHost: "localhost", // default fallback brokerPort: 9092, // default fallback }, nil } // Delegate methods to SeaweedMQ handler // GetOrCreateLedger delegates to SeaweedMQ handler func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { return h.seaweedMQHandler.GetOrCreateLedger(topic, partition) } // GetLedger delegates to SeaweedMQ handler func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { return h.seaweedMQHandler.GetLedger(topic, partition) } // Close shuts down the handler and all connections func (h *Handler) Close() error { // Close group coordinator if h.groupCoordinator != nil { h.groupCoordinator.Close() } // Close broker client if present if h.brokerClient != nil { if err := h.brokerClient.Close(); err != nil { fmt.Printf("Warning: failed to close broker client: %v\n", err) } } // Close SeaweedMQ handler if present if h.seaweedMQHandler != nil { return h.seaweedMQHandler.Close() } return nil } // StoreRecordBatch stores a record batch for later retrieval during Fetch operations func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { // Record batch storage is now handled by the SeaweedMQ handler fmt.Printf("DEBUG: StoreRecordBatch delegated to SeaweedMQ handler - topic:%s, partition:%d, offset:%d\n", topicName, partition, baseOffset) } // GetRecordBatch retrieves a stored record batch that contains the requested offset func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { // Record batch retrieval is now handled by the SeaweedMQ handler fmt.Printf("DEBUG: GetRecordBatch delegated to SeaweedMQ handler - topic:%s, partition:%d, offset:%d\n", topicName, partition, offset) return nil, false } // getRecordCountFromBatch extracts the record count from a Kafka record batch func (h *Handler) getRecordCountFromBatch(batch []byte) int32 { // Kafka record batch format: // base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) + // attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) + // producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records... // The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57) if len(batch) < 61 { // 57 + 4 bytes for record count return 0 } recordCount := binary.BigEndian.Uint32(batch[57:61]) return int32(recordCount) } // SetBrokerAddress updates the broker address used in Metadata responses func (h *Handler) SetBrokerAddress(host string, port int) { h.brokerHost = host h.brokerPort = port } // HandleConn processes a single client connection func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) defer func() { fmt.Printf("DEBUG: [%s] Connection closing\n", connectionID) conn.Close() }() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) defer w.Flush() for { // Check if context is cancelled select { case <-ctx.Done(): fmt.Printf("DEBUG: [%s] Context cancelled, closing connection\n", connectionID) return ctx.Err() default: } // Set a read deadline for the connection based on context if deadline, ok := ctx.Deadline(); ok { conn.SetReadDeadline(deadline) } else { // Set a reasonable timeout if no deadline is set conn.SetReadDeadline(time.Now().Add(5 * time.Second)) } // Read message size (4 bytes) var sizeBytes [4]byte if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if err == io.EOF { fmt.Printf("DEBUG: Client closed connection (clean EOF)\n") return nil // clean disconnect } // Check if error is due to context cancellation if netErr, ok := err.(net.Error); ok && netErr.Timeout() { select { case <-ctx.Done(): fmt.Printf("DEBUG: [%s] Read timeout due to context cancellation\n", connectionID) return ctx.Err() default: // Actual timeout, continue with error } } fmt.Printf("DEBUG: Error reading message size: %v\n", err) return fmt.Errorf("read size: %w", err) } size := binary.BigEndian.Uint32(sizeBytes[:]) if size == 0 || size > 1024*1024 { // 1MB limit // TODO: Consider making message size limit configurable // 1MB might be too restrictive for some use cases // Kafka default max.message.bytes is often higher return fmt.Errorf("invalid message size: %d", size) } // Read the message messageBuf := make([]byte, size) if _, err := io.ReadFull(r, messageBuf); err != nil { return fmt.Errorf("read message: %w", err) } // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { return fmt.Errorf("message too short") } apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) apiName := getAPIName(apiKey) // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { // Return proper Kafka error response for unsupported version response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion) if writeErr != nil { return fmt.Errorf("build error response: %w", writeErr) } // Send error response and continue to next request responseSizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) w.Write(responseSizeBytes) w.Write(response) w.Flush() continue } // Strip client_id (nullable STRING) from header to get pure request body bodyOffset := 8 if len(messageBuf) < bodyOffset+2 { return fmt.Errorf("invalid header: missing client_id length") } clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2])) bodyOffset += 2 if clientIDLen >= 0 { if len(messageBuf) < bodyOffset+int(clientIDLen) { return fmt.Errorf("invalid header: client_id truncated") } // clientID := string(messageBuf[bodyOffset : bodyOffset+int(clientIDLen)]) bodyOffset += int(clientIDLen) } else { // client_id is null; nothing to skip } // TODO: Flexible versions have tagged fields in header; ignored for now requestBody := messageBuf[bodyOffset:] // Handle the request based on API key and version var response []byte var err error switch apiKey { case 18: // ApiVersions response, err = h.handleApiVersions(correlationID) case 3: // Metadata response, err = h.handleMetadata(correlationID, apiVersion, requestBody) case 2: // ListOffsets fmt.Printf("DEBUG: *** LISTOFFSETS REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleListOffsets(correlationID, apiVersion, requestBody) case 19: // CreateTopics response, err = h.handleCreateTopics(correlationID, apiVersion, requestBody) case 20: // DeleteTopics response, err = h.handleDeleteTopics(correlationID, requestBody) case 0: // Produce response, err = h.handleProduce(correlationID, apiVersion, requestBody) case 1: // Fetch fmt.Printf("DEBUG: *** FETCH HANDLER CALLED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleFetch(correlationID, apiVersion, requestBody) if err != nil { fmt.Printf("DEBUG: Fetch error: %v\n", err) } else { fmt.Printf("DEBUG: Fetch response hex dump (%d bytes): %x\n", len(response), response) } case 11: // JoinGroup fmt.Printf("DEBUG: *** JOINGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleJoinGroup(correlationID, apiVersion, requestBody) if err != nil { fmt.Printf("DEBUG: JoinGroup error: %v\n", err) } else { fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) } case 14: // SyncGroup fmt.Printf("DEBUG: *** 🎉 SYNCGROUP API CALLED! Version: %d, Correlation: %d ***\n", apiVersion, correlationID) response, err = h.handleSyncGroup(correlationID, apiVersion, requestBody) if err != nil { fmt.Printf("DEBUG: SyncGroup error: %v\n", err) } else { fmt.Printf("DEBUG: SyncGroup response hex dump (%d bytes): %x\n", len(response), response) } case 8: // OffsetCommit response, err = h.handleOffsetCommit(correlationID, requestBody) case 9: // OffsetFetch fmt.Printf("DEBUG: *** OFFSETFETCH REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleOffsetFetch(correlationID, apiVersion, requestBody) if err != nil { fmt.Printf("DEBUG: OffsetFetch error: %v\n", err) } else { fmt.Printf("DEBUG: OffsetFetch response hex dump (%d bytes): %x\n", len(response), response) } case 10: // FindCoordinator fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleFindCoordinator(correlationID, requestBody) if err != nil { fmt.Printf("DEBUG: FindCoordinator error: %v\n", err) } case 12: // Heartbeat response, err = h.handleHeartbeat(correlationID, requestBody) case 13: // LeaveGroup response, err = h.handleLeaveGroup(correlationID, requestBody) default: fmt.Printf("DEBUG: *** UNSUPPORTED API KEY *** %d (%s) v%d - Correlation: %d\n", apiKey, apiName, apiVersion, correlationID) err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion) } if err != nil { return fmt.Errorf("handle request: %w", err) } // Write response size and data responseSizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) if _, err := w.Write(responseSizeBytes); err != nil { return fmt.Errorf("write response size: %w", err) } if _, err := w.Write(response); err != nil { return fmt.Errorf("write response: %w", err) } if err := w.Flush(); err != nil { return fmt.Errorf("flush response: %w", err) } // Minimal flush logging // fmt.Printf("DEBUG: API %d flushed\n", apiKey) } } func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // Build ApiVersions response manually // Response format (v0): correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys response := make([]byte, 0, 64) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Error code (0 = no error) response = append(response, 0, 0) // Number of API keys (compact array format in newer versions, but using basic format for simplicity) response = append(response, 0, 0, 0, 14) // 14 API keys // API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 18) // API key 18 response = append(response, 0, 0) // min version 0 response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) // TEMPORARY FIX: Limit to v4 since v6 has format issues with kafka-go // Sarama works with v4, kafka-go should also work with v4 response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 response = append(response, 0, 7) // max version 7 // API Key 2 (ListOffsets): limit to v2 (implemented and tested) response = append(response, 0, 2) // API key 2 response = append(response, 0, 0) // min version 0 response = append(response, 0, 2) // max version 2 // API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 19) // API key 19 response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 // API Key 20 (DeleteTopics): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 20) // API key 20 response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 // API Key 0 (Produce): api_key(2) + min_version(2) + max_version(2) // Support v7 for Sarama compatibility (Kafka 2.1.0) response = append(response, 0, 0) // API key 0 response = append(response, 0, 0) // min version 0 response = append(response, 0, 7) // max version 7 // API Key 1 (Fetch): limit to v7 (current handler semantics) response = append(response, 0, 1) // API key 1 response = append(response, 0, 0) // min version 0 response = append(response, 0, 7) // max version 7 // API Key 11 (JoinGroup): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 11) // API key 11 response = append(response, 0, 0) // min version 0 response = append(response, 0, 7) // max version 7 // API Key 14 (SyncGroup): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 14) // API key 14 response = append(response, 0, 0) // min version 0 response = append(response, 0, 5) // max version 5 // API Key 8 (OffsetCommit): limit to v2 for current implementation response = append(response, 0, 8) // API key 8 response = append(response, 0, 0) // min version 0 response = append(response, 0, 2) // max version 2 // API Key 9 (OffsetFetch): limit to v2 (implemented and tested) response = append(response, 0, 9) // API key 9 response = append(response, 0, 0) // min version 0 response = append(response, 0, 2) // max version 2 // API Key 10 (FindCoordinator): limit to v2 (implemented) response = append(response, 0, 10) // API key 10 response = append(response, 0, 0) // min version 0 response = append(response, 0, 2) // max version 2 // API Key 12 (Heartbeat): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 12) // API key 12 response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 // API Key 13 (LeaveGroup): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 13) // API key 13 response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 fmt.Printf("DEBUG: ApiVersions v0 response: %d bytes\n", len(response)) return response, nil } // handleMetadataV0 implements the Metadata API response in version 0 format. // v0 response layout: // correlation_id(4) + brokers(ARRAY) + topics(ARRAY) // broker: node_id(4) + host(STRING) + port(4) // topic: error_code(2) + name(STRING) + partitions(ARRAY) // partition: error_code(2) + partition_id(4) + leader(4) + replicas(ARRAY) + isr(ARRAY) func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]byte, error) { response := make([]byte, 0, 256) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) // Broker 0: node_id(4) + host(STRING) + port(4) response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions) // Use dynamic broker address set by the server host := h.brokerHost port := h.brokerPort fmt.Printf("DEBUG: Advertising broker (v0) at %s:%d\n", host, port) // Host (STRING: 2 bytes length + bytes) hostLen := uint16(len(host)) response = append(response, byte(hostLen>>8), byte(hostLen)) response = append(response, []byte(host)...) // Port (4 bytes) portBytes := make([]byte, 4) binary.BigEndian.PutUint32(portBytes, uint32(port)) response = append(response, portBytes...) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v0 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } // Topics array length (4 bytes) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) response = append(response, topicsCountBytes...) // Topic entries for _, topicName := range topicsToReturn { // error_code(2) = 0 response = append(response, 0, 0) // name (STRING) nameBytes := []byte(topicName) nameLen := uint16(len(nameBytes)) response = append(response, byte(nameLen>>8), byte(nameLen)) response = append(response, nameBytes...) // partitions array length (4 bytes) - 1 partition response = append(response, 0, 0, 0, 1) // partition: error_code(2) + partition_id(4) + leader(4) response = append(response, 0, 0) // error_code response = append(response, 0, 0, 0, 0) // partition_id = 0 response = append(response, 0, 0, 0, 1) // leader = 1 (this broker) // replicas: array length(4) + one broker id (1) response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) // isr: array length(4) + one broker id (1) response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) } fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) fmt.Printf("DEBUG: *** METADATA v0 RESPONSE DETAILS ***\n") fmt.Printf("DEBUG: Response size: %d bytes\n", len(response)) fmt.Printf("DEBUG: Broker: %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Topics: %v\n", topicsToReturn) for i, topic := range topicsToReturn { fmt.Printf("DEBUG: Topic[%d]: %s (1 partition)\n", i, topic) } fmt.Printf("DEBUG: *** END METADATA v0 RESPONSE ***\n") return response, nil } func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { // Simplified Metadata v1 implementation - based on working v0 + v1 additions // v1 adds: ControllerID (after brokers), Rack (for brokers), IsInternal (for topics) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v1 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } // Build response using same approach as v0 but with v1 additions response := make([]byte, 0, 256) // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) response = append(response, 0, 0, 0, 1) // node_id = 1 // Use dynamic broker address set by the server host := h.brokerHost port := h.brokerPort fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", host, port) // Host (STRING: 2 bytes length + bytes) hostLen := uint16(len(host)) response = append(response, byte(hostLen>>8), byte(hostLen)) response = append(response, []byte(host)...) // Port (4 bytes) portBytes := make([]byte, 4) binary.BigEndian.PutUint32(portBytes, uint32(port)) response = append(response, portBytes...) // Rack (STRING: 2 bytes length + bytes) - v1 addition, non-nullable empty string response = append(response, 0, 0) // empty string // ControllerID (4 bytes) - v1 addition response = append(response, 0, 0, 0, 1) // controller_id = 1 // Topics array length (4 bytes) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) response = append(response, topicsCountBytes...) // Topics for _, topicName := range topicsToReturn { // error_code (2 bytes) response = append(response, 0, 0) // topic name (STRING: 2 bytes length + bytes) topicLen := uint16(len(topicName)) response = append(response, byte(topicLen>>8), byte(topicLen)) response = append(response, []byte(topicName)...) // is_internal (1 byte) - v1 addition response = append(response, 0) // false // partitions array length (4 bytes) - 1 partition response = append(response, 0, 0, 0, 1) // partition 0: error_code(2) + partition_id(4) + leader_id(4) + replicas(ARRAY) + isr(ARRAY) response = append(response, 0, 0) // error_code response = append(response, 0, 0, 0, 0) // partition_id = 0 response = append(response, 0, 0, 0, 1) // leader_id = 1 // replicas: array length(4) + one broker id (1) response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) // isr: array length(4) + one broker id (1) response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) } fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) fmt.Printf("DEBUG: Metadata v1 response size: %d bytes\n", len(response)) return response, nil } // HandleMetadataV2 implements Metadata API v2 with ClusterID field func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v2 adds ClusterID field (nullable string) // v2 response layout: correlation_id(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v2 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } var buf bytes.Buffer // Correlation ID (4 bytes) binary.Write(&buf, binary.BigEndian, correlationID) // Brokers array (4 bytes length + brokers) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker // Broker 0 binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID // Host (STRING: 2 bytes length + data) host := h.brokerHost binary.Write(&buf, binary.BigEndian, int16(len(host))) buf.WriteString(host) // Port (4 bytes) binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2 addition // Use -1 length to indicate null binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID // ControllerID (4 bytes) - v1+ addition binary.Write(&buf, binary.BigEndian, int32(1)) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) for _, topicName := range topicsToReturn { // ErrorCode (2 bytes) binary.Write(&buf, binary.BigEndian, int16(0)) // Name (STRING: 2 bytes length + data) binary.Write(&buf, binary.BigEndian, int16(len(topicName))) buf.WriteString(topicName) // IsInternal (1 byte) - v1+ addition buf.WriteByte(0) // false // Partitions array (4 bytes length + partitions) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition // Partition 0 binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 } response := buf.Bytes() fmt.Printf("DEBUG: Advertising broker (v2) at %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Metadata v2 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) return response, nil } // HandleMetadataV3V4 implements Metadata API v3/v4 with ThrottleTimeMs field func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v3/v4 adds ThrottleTimeMs field at the beginning // v3/v4 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } var buf bytes.Buffer // Correlation ID (4 bytes) binary.Write(&buf, binary.BigEndian, correlationID) // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling // Brokers array (4 bytes length + brokers) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker // Broker 0 binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID // Host (STRING: 2 bytes length + data) host := h.brokerHost binary.Write(&buf, binary.BigEndian, int16(len(host))) buf.WriteString(host) // Port (4 bytes) binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition // Use -1 length to indicate null binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID // ControllerID (4 bytes) - v1+ addition binary.Write(&buf, binary.BigEndian, int32(1)) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) for _, topicName := range topicsToReturn { // ErrorCode (2 bytes) binary.Write(&buf, binary.BigEndian, int16(0)) // Name (STRING: 2 bytes length + data) binary.Write(&buf, binary.BigEndian, int16(len(topicName))) buf.WriteString(topicName) // IsInternal (1 byte) - v1+ addition buf.WriteByte(0) // false // Partitions array (4 bytes length + partitions) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition // Partition 0 binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 } response := buf.Bytes() return response, nil } // HandleMetadataV5V6 implements Metadata API v5/v6 with OfflineReplicas field func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v5/v6 adds OfflineReplicas field to partitions // v5/v6 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v5/v6 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } var buf bytes.Buffer // Correlation ID (4 bytes) binary.Write(&buf, binary.BigEndian, correlationID) // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling // Brokers array (4 bytes length + brokers) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker // Broker 0 binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID // Host (STRING: 2 bytes length + data) host := h.brokerHost binary.Write(&buf, binary.BigEndian, int16(len(host))) buf.WriteString(host) // Port (4 bytes) binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition // Use -1 length to indicate null binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID // ControllerID (4 bytes) - v1+ addition binary.Write(&buf, binary.BigEndian, int32(1)) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) for _, topicName := range topicsToReturn { // ErrorCode (2 bytes) binary.Write(&buf, binary.BigEndian, int16(0)) // Name (STRING: 2 bytes length + data) binary.Write(&buf, binary.BigEndian, int16(len(topicName))) buf.WriteString(topicName) // IsInternal (1 byte) - v1+ addition buf.WriteByte(0) // false // Partitions array (4 bytes length + partitions) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition // Partition 0 binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas } response := buf.Bytes() fmt.Printf("DEBUG: Advertising broker (v5/v6) at %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Metadata v5/v6 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) return response, nil } // HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) { // Metadata v7 adds LeaderEpoch field to partitions // v7 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + leader_epoch(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v7 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } } var buf bytes.Buffer // Correlation ID (4 bytes) binary.Write(&buf, binary.BigEndian, correlationID) // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling // Brokers array (4 bytes length + brokers) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker // Broker 0 binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID // Host (STRING: 2 bytes length + data) host := h.brokerHost binary.Write(&buf, binary.BigEndian, int16(len(host))) buf.WriteString(host) // Port (4 bytes) binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition // Use -1 length to indicate null binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID // ControllerID (4 bytes) - v1+ addition binary.Write(&buf, binary.BigEndian, int32(1)) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) for _, topicName := range topicsToReturn { // ErrorCode (2 bytes) binary.Write(&buf, binary.BigEndian, int16(0)) // Name (STRING: 2 bytes length + data) binary.Write(&buf, binary.BigEndian, int16(len(topicName))) buf.WriteString(topicName) // IsInternal (1 byte) - v1+ addition buf.WriteByte(0) // false // Partitions array (4 bytes length + partitions) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition // Partition 0 binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID // LeaderEpoch (4 bytes) - v7+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // Leader epoch 0 // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas } response := buf.Bytes() fmt.Printf("DEBUG: Advertising broker (v7) at %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Metadata v7 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) return response, nil } func (h *Handler) parseMetadataTopics(requestBody []byte) []string { // Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32), // while older assumptions may have included a client_id string first. if len(requestBody) < 4 { return []string{} } // Try path A: interpret first 4 bytes as topics_count offset := 0 topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) if topicsCount == 0xFFFFFFFF { // -1 means all topics return []string{} } if topicsCount <= 1000000 { // sane bound offset += 4 topics := make([]string, 0, topicsCount) for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 if offset+nameLen > len(requestBody) { break } topics = append(topics, string(requestBody[offset:offset+nameLen])) offset += nameLen } return topics } // Path B: assume leading client_id string then topics_count if len(requestBody) < 6 { return []string{} } clientIDLen := int(binary.BigEndian.Uint16(requestBody[0:2])) offset = 2 + clientIDLen if len(requestBody) < offset+4 { return []string{} } topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 if topicsCount == 0xFFFFFFFF { return []string{} } topics := make([]string, 0, topicsCount) for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 if offset+nameLen > len(requestBody) { break } topics = append(topics, string(requestBody[offset:offset+nameLen])) offset += nameLen } return topics } func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { fmt.Printf("DEBUG: ListOffsets v%d request hex dump (first 100 bytes): %x\n", apiVersion, requestBody[:min(100, len(requestBody))]) // Parse minimal request to understand what's being asked (header already stripped) offset := 0 // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { return nil, fmt.Errorf("ListOffsets v%d request missing replica_id", apiVersion) } replicaID := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) offset += 4 fmt.Printf("DEBUG: ListOffsets v%d - replica_id: %d\n", apiVersion, replicaID) } // v2+ adds isolation_level(1) if apiVersion >= 2 { if len(requestBody) < offset+1 { return nil, fmt.Errorf("ListOffsets v%d request missing isolation_level", apiVersion) } isolationLevel := requestBody[offset] offset += 1 fmt.Printf("DEBUG: ListOffsets v%d - isolation_level: %d\n", apiVersion, isolationLevel) } if len(requestBody) < offset+4 { return nil, fmt.Errorf("ListOffsets request missing topics count") } topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 response := make([]byte, 0, 256) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Throttle time (4 bytes, 0 = no throttling) - v2+ only if apiVersion >= 2 { response = append(response, 0, 0, 0, 0) } // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) // Process each requested topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 if len(requestBody) < offset+int(topicNameSize)+4 { break } topicName := requestBody[offset : offset+int(topicNameSize)] offset += int(topicNameSize) // Parse partitions count for this topic partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // Response: topic_name_size(2) + topic_name + partitions_array response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, topicName...) partitionsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount) response = append(response, partitionsCountBytes...) // Process each partition for j := uint32(0); j < partitionsCount && offset+12 <= len(requestBody); j++ { // Parse partition request: partition_id(4) + timestamp(8) partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4]) timestamp := int64(binary.BigEndian.Uint64(requestBody[offset+4 : offset+12])) offset += 12 // Response: partition_id(4) + error_code(2) + timestamp(8) + offset(8) partitionIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionIDBytes, partitionID) response = append(response, partitionIDBytes...) // Error code (0 = no error) response = append(response, 0, 0) // Get the ledger for this topic-partition ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID)) var responseTimestamp int64 var responseOffset int64 switch timestamp { case -2: // earliest offset responseOffset = ledger.GetEarliestOffset() if responseOffset == ledger.GetHighWaterMark() { // No messages yet, return current time responseTimestamp = time.Now().UnixNano() } else { // Get timestamp of earliest message if ts, _, err := ledger.GetRecord(responseOffset); err == nil { responseTimestamp = ts } else { responseTimestamp = time.Now().UnixNano() } } case -1: // latest offset responseOffset = ledger.GetLatestOffset() if responseOffset == 0 && ledger.GetHighWaterMark() == 0 { // No messages yet responseTimestamp = time.Now().UnixNano() responseOffset = 0 } else { // Get timestamp of latest message if ts, _, err := ledger.GetRecord(responseOffset); err == nil { responseTimestamp = ts } else { responseTimestamp = time.Now().UnixNano() } } default: // specific timestamp - find offset by timestamp responseOffset = ledger.FindOffsetByTimestamp(timestamp) responseTimestamp = timestamp } timestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(timestampBytes, uint64(responseTimestamp)) response = append(response, timestampBytes...) offsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(offsetBytes, uint64(responseOffset)) response = append(response, offsetBytes...) } } return response, nil } func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { fmt.Printf("DEBUG: *** CREATETOPICS REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) if len(requestBody) < 2 { return nil, fmt.Errorf("CreateTopics request too short") } // Parse based on API version switch apiVersion { case 0, 1: return h.handleCreateTopicsV0V1(correlationID, requestBody) case 2, 3, 4, 5: return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) } } func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // CreateTopics v2+ format: // topics_array + timeout_ms(4) + validate_only(1) + [tagged_fields] offset := 0 // Parse topics array (compact array format in v2+) if len(requestBody) < offset+1 { return nil, fmt.Errorf("CreateTopics v2+ request missing topics array") } // Read topics count (compact array: length + 1) topicsCountRaw := requestBody[offset] offset += 1 var topicsCount uint32 if topicsCountRaw == 0 { topicsCount = 0 } else { topicsCount = uint32(topicsCountRaw) - 1 } fmt.Printf("DEBUG: CreateTopics v%d - Topics count: %d, remaining bytes: %d\n", apiVersion, topicsCount, len(requestBody)-offset) // DEBUG: Hex dump to understand request format dumpLen := len(requestBody) if dumpLen > 50 { dumpLen = 50 } fmt.Printf("DEBUG: CreateTopics v%d request hex dump (first %d bytes): %x\n", apiVersion, dumpLen, requestBody[:dumpLen]) // Build response response := make([]byte, 0, 256) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) // Topics array (compact format in v2+: count + 1) if topicsCount == 0 { response = append(response, 0) // Empty array } else { response = append(response, byte(topicsCount+1)) // Compact array format } // Process each topic (using SeaweedMQ handler) for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { // Parse topic name (compact string in v2+) if len(requestBody) < offset+1 { break } topicNameLengthRaw := requestBody[offset] offset += 1 var topicNameLength int if topicNameLengthRaw == 0 { topicNameLength = 0 } else { topicNameLength = int(topicNameLengthRaw) - 1 } if len(requestBody) < offset+topicNameLength { break } topicName := string(requestBody[offset : offset+topicNameLength]) offset += topicNameLength // Parse num_partitions (4 bytes) if len(requestBody) < offset+4 { break } numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // Parse replication_factor (2 bytes) if len(requestBody) < offset+2 { break } replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 // Parse configs (compact array in v2+) if len(requestBody) >= offset+1 { configsCountRaw := requestBody[offset] offset += 1 var configsCount uint32 if configsCountRaw == 0 { configsCount = 0 } else { configsCount = uint32(configsCountRaw) - 1 } // Skip configs for now (simplified) for j := uint32(0); j < configsCount && offset < len(requestBody); j++ { // Skip config name (compact string) if len(requestBody) >= offset+1 { configNameLengthRaw := requestBody[offset] offset += 1 if configNameLengthRaw > 0 { configNameLength := int(configNameLengthRaw) - 1 offset += configNameLength } } // Skip config value (compact string) if len(requestBody) >= offset+1 { configValueLengthRaw := requestBody[offset] offset += 1 if configValueLengthRaw > 0 { configValueLength := int(configValueLengthRaw) - 1 offset += configValueLength } } } } // Skip tagged fields (empty for now) if len(requestBody) >= offset+1 { taggedFieldsCount := requestBody[offset] offset += 1 // Skip tagged fields (simplified - should be 0 for basic requests) for j := 0; j < int(taggedFieldsCount); j++ { // Skip tagged field parsing for now break } } fmt.Printf("DEBUG: Parsed topic: %s, partitions: %d, replication: %d\n", topicName, numPartitions, replicationFactor) // Response: topic_name (compact string) + error_code(2) + error_message (compact string) if len(topicName) == 0 { response = append(response, 0) // Empty string } else { response = append(response, byte(len(topicName)+1)) // Compact string format } response = append(response, []byte(topicName)...) // Check if topic already exists var errorCode uint16 = 0 var errorMessage string = "" // Use SeaweedMQ integration if h.seaweedMQHandler.TopicExists(topicName) { errorCode = 36 // TOPIC_ALREADY_EXISTS errorMessage = "Topic already exists" } else if numPartitions <= 0 { errorCode = 37 // INVALID_PARTITIONS errorMessage = "Invalid number of partitions" } else if replicationFactor <= 0 { errorCode = 38 // INVALID_REPLICATION_FACTOR errorMessage = "Invalid replication factor" } else { // Create the topic in SeaweedMQ if err := h.seaweedMQHandler.CreateTopic(topicName, int32(numPartitions)); err != nil { errorCode = 1 // UNKNOWN_SERVER_ERROR errorMessage = err.Error() } } // Error code response = append(response, byte(errorCode>>8), byte(errorCode)) // Error message (compact nullable string in v2+) if errorMessage == "" { response = append(response, 0) // null string in compact format } else { response = append(response, byte(len(errorMessage)+1)) // Compact string format response = append(response, []byte(errorMessage)...) } // Tagged fields (empty) response = append(response, 0) } // Parse timeout_ms and validate_only at the end (after all topics) if len(requestBody) >= offset+4 { timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 fmt.Printf("DEBUG: CreateTopics timeout_ms: %d\n", timeoutMs) } if len(requestBody) >= offset+1 { validateOnly := requestBody[offset] != 0 offset += 1 fmt.Printf("DEBUG: CreateTopics validate_only: %v\n", validateOnly) } // Tagged fields at the end response = append(response, 0) return response, nil } // handleCreateTopicsV0V1 handles CreateTopics API versions 0 and 1 func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byte) ([]byte, error) { // TODO: Implement v0/v1 parsing if needed // For now, return unsupported version error response := make([]byte, 0, 32) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Throttle time response = append(response, 0, 0, 0, 0) // Empty topics array response = append(response, 0, 0, 0, 0) return response, nil } func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal DeleteTopics request // Request format: client_id + timeout(4) + topics_array if len(requestBody) < 6 { // client_id_size(2) + timeout(4) return nil, fmt.Errorf("DeleteTopics request too short") } // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) return nil, fmt.Errorf("DeleteTopics request missing data") } // Skip timeout offset += 4 topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 response := make([]byte, 0, 256) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) // Process each topic (using SeaweedMQ handler) for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 if len(requestBody) < offset+int(topicNameSize) { break } topicName := string(requestBody[offset : offset+int(topicNameSize)]) offset += int(topicNameSize) // Response: topic_name + error_code(2) + error_message response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, []byte(topicName)...) // Check if topic exists and delete it var errorCode uint16 = 0 var errorMessage string = "" // Use SeaweedMQ integration if !h.seaweedMQHandler.TopicExists(topicName) { errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION errorMessage = "Unknown topic" } else { // Delete the topic from SeaweedMQ if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { errorCode = 1 // UNKNOWN_SERVER_ERROR errorMessage = err.Error() } } // Error code response = append(response, byte(errorCode>>8), byte(errorCode)) // Error message (nullable string) if errorMessage == "" { response = append(response, 0xFF, 0xFF) // null string } else { errorMsgLen := uint16(len(errorMessage)) response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) response = append(response, []byte(errorMessage)...) } } return response, nil } // validateAPIVersion checks if we support the requested API version func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { supportedVersions := map[uint16][2]uint16{ 18: {0, 3}, // ApiVersions: v0-v3 3: {0, 7}, // Metadata: v0-v7 0: {0, 7}, // Produce: v0-v7 1: {0, 7}, // Fetch: v0-v7 2: {0, 2}, // ListOffsets: v0-v2 19: {0, 4}, // CreateTopics: v0-v4 20: {0, 4}, // DeleteTopics: v0-v4 10: {0, 2}, // FindCoordinator: v0-v2 11: {0, 7}, // JoinGroup: v0-v7 14: {0, 5}, // SyncGroup: v0-v5 8: {0, 2}, // OffsetCommit: v0-v2 9: {0, 2}, // OffsetFetch: v0-v2 12: {0, 4}, // Heartbeat: v0-v4 13: {0, 4}, // LeaveGroup: v0-v4 } if versionRange, exists := supportedVersions[apiKey]; exists { minVer, maxVer := versionRange[0], versionRange[1] if apiVersion < minVer || apiVersion > maxVer { return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", apiVersion, apiKey, minVer, maxVer) } return nil } return fmt.Errorf("unsupported API key: %d", apiKey) } // buildUnsupportedVersionResponse creates a proper Kafka error response func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, apiVersion uint16) ([]byte, error) { response := make([]byte, 0, 16) // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Error code: UNSUPPORTED_VERSION (35) response = append(response, 0, 35) // Error message errorMsg := fmt.Sprintf("Unsupported version %d for API key %d", apiVersion, apiKey) errorMsgLen := uint16(len(errorMsg)) response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) response = append(response, []byte(errorMsg)...) return response, nil } // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { switch apiVersion { case 0: return h.HandleMetadataV0(correlationID, requestBody) case 1: return h.HandleMetadataV1(correlationID, requestBody) case 2: return h.HandleMetadataV2(correlationID, requestBody) case 3, 4: return h.HandleMetadataV3V4(correlationID, requestBody) case 5, 6: return h.HandleMetadataV5V6(correlationID, requestBody) case 7: return h.HandleMetadataV7(correlationID, requestBody) default: return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) } } // getAPIName returns a human-readable name for Kafka API keys (for debugging) func getAPIName(apiKey uint16) string { switch apiKey { case 0: return "Produce" case 1: return "Fetch" case 2: return "ListOffsets" case 3: return "Metadata" case 8: return "OffsetCommit" case 9: return "OffsetFetch" case 10: return "FindCoordinator" case 11: return "JoinGroup" case 12: return "Heartbeat" case 13: return "LeaveGroup" case 14: return "SyncGroup" case 18: return "ApiVersions" case 19: return "CreateTopics" case 20: return "DeleteTopics" default: return "Unknown" } } // EnableSchemaManagement enables schema management with the given configuration func (h *Handler) EnableSchemaManagement(config schema.ManagerConfig) error { manager, err := schema.NewManagerWithHealthCheck(config) if err != nil { return fmt.Errorf("failed to create schema manager: %w", err) } h.schemaManager = manager h.useSchema = true fmt.Printf("Schema management enabled with registry: %s\n", config.RegistryURL) return nil } // EnableBrokerIntegration enables mq.broker integration for schematized messages func (h *Handler) EnableBrokerIntegration(brokers []string) error { if !h.IsSchemaEnabled() { return fmt.Errorf("schema management must be enabled before broker integration") } brokerClient := schema.NewBrokerClient(schema.BrokerClientConfig{ Brokers: brokers, SchemaManager: h.schemaManager, }) h.brokerClient = brokerClient fmt.Printf("Broker integration enabled with brokers: %v\n", brokers) return nil } // DisableSchemaManagement disables schema management and broker integration func (h *Handler) DisableSchemaManagement() { if h.brokerClient != nil { h.brokerClient.Close() h.brokerClient = nil fmt.Println("Broker integration disabled") } h.schemaManager = nil h.useSchema = false fmt.Println("Schema management disabled") } // IsSchemaEnabled returns whether schema management is enabled func (h *Handler) IsSchemaEnabled() bool { return h.useSchema && h.schemaManager != nil } // IsBrokerIntegrationEnabled returns true if broker integration is enabled func (h *Handler) IsBrokerIntegrationEnabled() bool { return h.IsSchemaEnabled() && h.brokerClient != nil } // commitOffsetToSMQ commits offset using SMQ storage func (h *Handler) commitOffsetToSMQ(key offset.ConsumerOffsetKey, offsetValue int64, metadata string) error { if h.smqOffsetStorage == nil { return fmt.Errorf("SMQ offset storage not initialized") } // Save to SMQ storage - use current timestamp and size 0 as placeholders // since SMQ storage primarily tracks the committed offset return h.smqOffsetStorage.SaveConsumerOffset(key, offsetValue, time.Now().UnixNano(), 0) } // fetchOffsetFromSMQ fetches offset using SMQ storage func (h *Handler) fetchOffsetFromSMQ(key offset.ConsumerOffsetKey) (int64, string, error) { if h.smqOffsetStorage == nil { return -1, "", fmt.Errorf("SMQ offset storage not initialized") } entries, err := h.smqOffsetStorage.LoadConsumerOffsets(key) if err != nil { return -1, "", err } if len(entries) == 0 { return -1, "", nil // No committed offset } // Return the committed offset (metadata is not stored in SMQ format) return entries[0].KafkaOffset, "", nil }