|
|
|
@ -8,7 +8,7 @@ import ( |
|
|
|
"net" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" |
|
|
|
) |
|
|
|
|
|
|
|
@ -29,7 +29,7 @@ type TopicPartitionKey struct { |
|
|
|
type Handler struct { |
|
|
|
topicsMu sync.RWMutex |
|
|
|
topics map[string]*TopicInfo // topic name -> topic info
|
|
|
|
|
|
|
|
|
|
|
|
ledgersMu sync.RWMutex |
|
|
|
ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger
|
|
|
|
} |
|
|
|
@ -44,25 +44,25 @@ func NewHandler() *Handler { |
|
|
|
// GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed
|
|
|
|
func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { |
|
|
|
key := TopicPartitionKey{Topic: topic, Partition: partition} |
|
|
|
|
|
|
|
|
|
|
|
// First try to get existing ledger with read lock
|
|
|
|
h.ledgersMu.RLock() |
|
|
|
ledger, exists := h.ledgers[key] |
|
|
|
h.ledgersMu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
if exists { |
|
|
|
return ledger |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create new ledger with write lock
|
|
|
|
h.ledgersMu.Lock() |
|
|
|
defer h.ledgersMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Double-check after acquiring write lock
|
|
|
|
if ledger, exists := h.ledgers[key]; exists { |
|
|
|
return ledger |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create and store new ledger
|
|
|
|
ledger = offset.NewLedger() |
|
|
|
h.ledgers[key] = ledger |
|
|
|
@ -72,10 +72,10 @@ func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledge |
|
|
|
// GetLedger returns the offset ledger for a topic-partition, or nil if not found
|
|
|
|
func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { |
|
|
|
key := TopicPartitionKey{Topic: topic, Partition: partition} |
|
|
|
|
|
|
|
|
|
|
|
h.ledgersMu.RLock() |
|
|
|
defer h.ledgersMu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
return h.ledgers[key] |
|
|
|
} |
|
|
|
|
|
|
|
@ -132,6 +132,8 @@ func (h *Handler) HandleConn(conn net.Conn) error { |
|
|
|
response, err = h.handleCreateTopics(correlationID, messageBuf[8:]) // skip header
|
|
|
|
case 20: // DeleteTopics
|
|
|
|
response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header
|
|
|
|
case 0: // Produce
|
|
|
|
response, err = h.handleProduce(correlationID, messageBuf[8:]) // skip header
|
|
|
|
default: |
|
|
|
err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion) |
|
|
|
} |
|
|
|
@ -172,7 +174,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { |
|
|
|
response = append(response, 0, 0) |
|
|
|
|
|
|
|
// Number of API keys (compact array format in newer versions, but using basic format for simplicity)
|
|
|
|
response = append(response, 0, 0, 0, 5) // 5 API keys
|
|
|
|
response = append(response, 0, 0, 0, 6) // 6 API keys
|
|
|
|
|
|
|
|
// API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2)
|
|
|
|
response = append(response, 0, 18) // API key 18
|
|
|
|
@ -199,6 +201,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { |
|
|
|
response = append(response, 0, 0) // min version 0
|
|
|
|
response = append(response, 0, 4) // max version 4
|
|
|
|
|
|
|
|
// API Key 0 (Produce): api_key(2) + min_version(2) + max_version(2)
|
|
|
|
response = append(response, 0, 0) // API key 0
|
|
|
|
response = append(response, 0, 0) // min version 0
|
|
|
|
response = append(response, 0, 7) // max version 7
|
|
|
|
|
|
|
|
// Throttle time (4 bytes, 0 = no throttling)
|
|
|
|
response = append(response, 0, 0, 0, 0) |
|
|
|
|
|
|
|
@ -330,10 +337,10 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ |
|
|
|
|
|
|
|
// Get the ledger for this topic-partition
|
|
|
|
ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID)) |
|
|
|
|
|
|
|
|
|
|
|
var responseTimestamp int64 |
|
|
|
var responseOffset int64 |
|
|
|
|
|
|
|
|
|
|
|
switch timestamp { |
|
|
|
case -2: // earliest offset
|
|
|
|
responseOffset = ledger.GetEarliestOffset() |
|
|
|
@ -348,7 +355,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ |
|
|
|
responseTimestamp = time.Now().UnixNano() |
|
|
|
} |
|
|
|
} |
|
|
|
case -1: // latest offset
|
|
|
|
case -1: // latest offset
|
|
|
|
responseOffset = ledger.GetLatestOffset() |
|
|
|
if responseOffset == 0 && ledger.GetHighWaterMark() == 0 { |
|
|
|
// No messages yet
|
|
|
|
@ -490,7 +497,7 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( |
|
|
|
Partitions: int32(numPartitions), |
|
|
|
CreatedAt: time.Now().UnixNano(), |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Initialize ledgers for all partitions
|
|
|
|
for partitionID := int32(0); partitionID < int32(numPartitions); partitionID++ { |
|
|
|
h.GetOrCreateLedger(topicName, partitionID) |
|
|
|
@ -585,7 +592,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( |
|
|
|
} else { |
|
|
|
// Delete the topic
|
|
|
|
delete(h.topics, topicName) |
|
|
|
|
|
|
|
|
|
|
|
// Clean up associated ledgers
|
|
|
|
h.ledgersMu.Lock() |
|
|
|
for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { |
|
|
|
|