diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 860b25dc7..4e400d914 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -8,7 +8,7 @@ import ( "net" "sync" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" ) @@ -29,7 +29,7 @@ type TopicPartitionKey struct { type Handler struct { topicsMu sync.RWMutex topics map[string]*TopicInfo // topic name -> topic info - + ledgersMu sync.RWMutex ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger } @@ -44,25 +44,25 @@ func NewHandler() *Handler { // GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} - + // First try to get existing ledger with read lock h.ledgersMu.RLock() ledger, exists := h.ledgers[key] h.ledgersMu.RUnlock() - + if exists { return ledger } - + // Create new ledger with write lock h.ledgersMu.Lock() defer h.ledgersMu.Unlock() - + // Double-check after acquiring write lock if ledger, exists := h.ledgers[key]; exists { return ledger } - + // Create and store new ledger ledger = offset.NewLedger() h.ledgers[key] = ledger @@ -72,10 +72,10 @@ func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledge // GetLedger returns the offset ledger for a topic-partition, or nil if not found func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} - + h.ledgersMu.RLock() defer h.ledgersMu.RUnlock() - + return h.ledgers[key] } @@ -132,6 +132,8 @@ func (h *Handler) HandleConn(conn net.Conn) error { response, err = h.handleCreateTopics(correlationID, messageBuf[8:]) // skip header case 20: // DeleteTopics response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header + case 0: // Produce + response, err = h.handleProduce(correlationID, messageBuf[8:]) // skip header default: err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion) } @@ -172,7 +174,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // Number of API keys (compact array format in newer versions, but using basic format for simplicity) - response = append(response, 0, 0, 0, 5) // 5 API keys + response = append(response, 0, 0, 0, 6) // 6 API keys // API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 18) // API key 18 @@ -199,6 +201,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 + // API Key 0 (Produce): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 0) // API key 0 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 7) // max version 7 + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) @@ -330,10 +337,10 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ // Get the ledger for this topic-partition ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID)) - + var responseTimestamp int64 var responseOffset int64 - + switch timestamp { case -2: // earliest offset responseOffset = ledger.GetEarliestOffset() @@ -348,7 +355,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ responseTimestamp = time.Now().UnixNano() } } - case -1: // latest offset + case -1: // latest offset responseOffset = ledger.GetLatestOffset() if responseOffset == 0 && ledger.GetHighWaterMark() == 0 { // No messages yet @@ -490,7 +497,7 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( Partitions: int32(numPartitions), CreatedAt: time.Now().UnixNano(), } - + // Initialize ledgers for all partitions for partitionID := int32(0); partitionID < int32(numPartitions); partitionID++ { h.GetOrCreateLedger(topicName, partitionID) @@ -585,7 +592,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( } else { // Delete the topic delete(h.topics, topicName) - + // Clean up associated ledgers h.ledgersMu.Lock() for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ {