Browse Source

fixes

pull/7231/head
chrislu 2 months ago
parent
commit
ba73939ca2
  1. 22
      weed/mq/kafka/gateway/server.go
  2. 103
      weed/mq/kafka/protocol/produce.go
  3. 90
      weed/mq/offset/manager.go

22
weed/mq/kafka/gateway/server.go

@ -49,8 +49,9 @@ func resolveAdvertisedAddress() string {
type Options struct {
Listen string
Masters string // SeaweedFS master servers (required)
Masters string // SeaweedFS master servers (required for production mode)
FilerGroup string // filer group name (optional)
TestMode bool // Use in-memory handler for testing (optional)
}
type Server struct {
@ -65,12 +66,21 @@ type Server struct {
func NewServer(opts Options) *Server {
ctx, cancel := context.WithCancel(context.Background())
// Create broker-based SeaweedMQ handler
handler, err := protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err)
var handler *protocol.Handler
var err error
if opts.TestMode || opts.Masters == "" {
// Use in-memory handler for testing or when no masters are configured
handler = protocol.NewHandler()
glog.V(1).Infof("Created Kafka gateway with in-memory handler for testing")
} else {
// Create broker-based SeaweedMQ handler for production
handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
return &Server{
opts: opts,

103
weed/mq/kafka/protocol/produce.go

@ -330,60 +330,36 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
// - Request: transactional_id field (nullable string) at the beginning
// - Response: throttle_time_ms field at the end (v1+)
// Parse Produce v7 request format based on actual Sarama request
// Format: client_id(STRING) + transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY)
// Parse Produce v7 request format (client_id is already handled by HandleConn)
// Format: transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY)
offset := 0
// Parse client_id (STRING: 2 bytes length + data)
// Parse transactional_id (NULLABLE_STRING: 2 bytes length + data, -1 = null)
if len(requestBody) < 2 {
return nil, fmt.Errorf("Produce v%d request too short for client_id", apiVersion)
}
clientIDLen := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(clientIDLen) {
return nil, fmt.Errorf("Produce v%d request client_id too short", apiVersion)
return nil, fmt.Errorf("Produce v%d request too short for transactional_id", apiVersion)
}
clientID := string(requestBody[offset : offset+int(clientIDLen)])
offset += int(clientIDLen)
fmt.Printf("DEBUG: Produce v%d - client_id: %s\n", apiVersion, clientID)
// Parse transactional_id (NULLABLE_STRING: 2 bytes length + data, -1 = null)
var transactionalID string = "null"
baseTxOffset := offset
if len(requestBody) >= offset+2 {
possibleLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
consumedTx := false
if possibleLen == -1 {
// consume just the length
offset += 2
consumedTx = true
} else if possibleLen >= 0 && len(requestBody) >= offset+2+int(possibleLen)+6 {
// There is enough room for a string and acks/timeout after it
offset += 2
if int(possibleLen) > 0 {
if len(requestBody) < offset+int(possibleLen) {
return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion)
}
transactionalID = string(requestBody[offset : offset+int(possibleLen)])
offset += int(possibleLen)
}
consumedTx = true
txIDLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if txIDLen == -1 {
// null transactional_id
transactionalID = "null"
fmt.Printf("DEBUG: Produce v%d - transactional_id: null\n", apiVersion)
} else if txIDLen >= 0 {
if len(requestBody) < offset+int(txIDLen) {
return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion)
}
// Tentatively consumed transactional_id; we'll validate later and may revert
_ = consumedTx
transactionalID = string(requestBody[offset : offset+int(txIDLen)])
offset += int(txIDLen)
fmt.Printf("DEBUG: Produce v%d - transactional_id: %s\n", apiVersion, transactionalID)
}
fmt.Printf("DEBUG: Produce v%d - transactional_id: %s\n", apiVersion, transactionalID)
// Parse acks (INT16) and timeout_ms (INT32)
if len(requestBody) < offset+6 {
// If transactional_id was mis-parsed, revert and try without it
offset = baseTxOffset
transactionalID = "null"
if len(requestBody) < offset+6 {
return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion)
}
return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion)
}
acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
@ -393,37 +369,20 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
fmt.Printf("DEBUG: Produce v%d - acks: %d, timeout: %d\n", apiVersion, acks, timeout)
// Parse topics array
if len(requestBody) < offset+4 {
// Fallback: treat transactional_id as absent if this seems invalid
offset = baseTxOffset
transactionalID = "null"
if len(requestBody) < offset+6 {
return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion)
}
acks = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
timeout = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// If acks=0, fire-and-forget - return empty response immediately
if acks == 0 {
fmt.Printf("DEBUG: Produce v%d - acks=0, returning empty response (fire-and-forget)\n", apiVersion)
return []byte{}, nil
}
fmt.Printf("DEBUG: Produce v%d - acks=%d, will process and return response\n", apiVersion, acks)
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// If topicsCount is implausible, revert transactional_id consumption and re-parse once
// If topicsCount is implausible, there might be a parsing issue
if topicsCount > 1000 {
// revert
offset = baseTxOffset
transactionalID = "null"
acks = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
timeout = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("Produce v%d request missing topics count", apiVersion)
}
topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount)
}
fmt.Printf("DEBUG: Produce v%d - topics count: %d\n", apiVersion, topicsCount)
@ -573,17 +532,15 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
}
}
// If acks=0, fire-and-forget - return empty response per Kafka spec
if acks == 0 {
return []byte{}, nil
}
// Append throttle_time_ms at the END for v1+
if apiVersion >= 1 {
response = append(response, 0, 0, 0, 0)
}
fmt.Printf("DEBUG: Produce v%d response: %d bytes\n", apiVersion, len(response))
fmt.Printf("DEBUG: Produce v%d response: %d bytes (acks=%d)\n", apiVersion, len(response), acks)
if len(response) < 20 {
fmt.Printf("DEBUG: Produce v%d response hex: %x\n", apiVersion, response)
}
return response, nil
}

90
weed/mq/offset/manager.go

@ -13,21 +13,21 @@ type PartitionOffsetManager struct {
mu sync.RWMutex
partition *schema_pb.Partition
nextOffset int64
// Checkpointing for recovery
lastCheckpoint int64
checkpointInterval int64
storage OffsetStorage
storage OffsetStorage
}
// OffsetStorage interface for persisting offset state
type OffsetStorage interface {
// SaveCheckpoint persists the current offset state for recovery
SaveCheckpoint(partition *schema_pb.Partition, offset int64) error
// LoadCheckpoint retrieves the last saved offset state
LoadCheckpoint(partition *schema_pb.Partition) (int64, error)
// GetHighestOffset scans storage to find the highest assigned offset
GetHighestOffset(partition *schema_pb.Partition) (int64, error)
}
@ -36,48 +36,64 @@ type OffsetStorage interface {
func NewPartitionOffsetManager(partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) {
manager := &PartitionOffsetManager{
partition: partition,
checkpointInterval: 100, // Checkpoint every 100 offsets
storage: storage,
checkpointInterval: 1, // Checkpoint every offset for immediate persistence
storage: storage,
}
// Recover offset state
if err := manager.recover(); err != nil {
return nil, fmt.Errorf("failed to recover offset state: %w", err)
}
return manager, nil
}
// AssignOffset assigns the next sequential offset
func (m *PartitionOffsetManager) AssignOffset() int64 {
var shouldCheckpoint bool
var checkpointOffset int64
m.mu.Lock()
defer m.mu.Unlock()
offset := m.nextOffset
m.nextOffset++
// Checkpoint periodically
// Check if we should checkpoint (but don't do it inside the lock)
if offset-m.lastCheckpoint >= m.checkpointInterval {
go m.checkpoint(offset)
shouldCheckpoint = true
checkpointOffset = offset
}
m.mu.Unlock()
// Checkpoint outside the lock to avoid deadlock
if shouldCheckpoint {
m.checkpoint(checkpointOffset)
}
return offset
}
// AssignOffsets assigns a batch of sequential offsets
func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) {
var shouldCheckpoint bool
var checkpointOffset int64
m.mu.Lock()
defer m.mu.Unlock()
baseOffset = m.nextOffset
lastOffset = m.nextOffset + count - 1
m.nextOffset += count
// Checkpoint if needed
// Check if we should checkpoint (but don't do it inside the lock)
if lastOffset-m.lastCheckpoint >= m.checkpointInterval {
go m.checkpoint(lastOffset)
shouldCheckpoint = true
checkpointOffset = lastOffset
}
m.mu.Unlock()
// Checkpoint outside the lock to avoid deadlock
if shouldCheckpoint {
m.checkpoint(checkpointOffset)
}
return baseOffset, lastOffset
}
@ -97,17 +113,17 @@ func (m *PartitionOffsetManager) GetHighWaterMark() int64 {
func (m *PartitionOffsetManager) recover() error {
var checkpointOffset int64 = -1
var highestOffset int64 = -1
// Try to load checkpoint
if offset, err := m.storage.LoadCheckpoint(m.partition); err == nil && offset >= 0 {
checkpointOffset = offset
}
// Try to scan storage for highest offset
if offset, err := m.storage.GetHighestOffset(m.partition); err == nil && offset >= 0 {
highestOffset = offset
}
// Use the higher of checkpoint or storage scan
if checkpointOffset >= 0 && highestOffset >= 0 {
if highestOffset > checkpointOffset {
@ -128,7 +144,7 @@ func (m *PartitionOffsetManager) recover() error {
m.nextOffset = 0
m.lastCheckpoint = -1
}
return nil
}
@ -139,7 +155,7 @@ func (m *PartitionOffsetManager) checkpoint(offset int64) {
fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err)
return
}
m.mu.Lock()
m.lastCheckpoint = offset
m.mu.Unlock()
@ -163,29 +179,29 @@ func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry
// GetManager returns the offset manager for a partition, creating it if needed
func (r *PartitionOffsetRegistry) GetManager(partition *schema_pb.Partition) (*PartitionOffsetManager, error) {
key := partitionKey(partition)
r.mu.RLock()
manager, exists := r.managers[key]
r.mu.RUnlock()
if exists {
return manager, nil
}
// Create new manager
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring write lock
if manager, exists := r.managers[key]; exists {
return manager, nil
}
manager, err := NewPartitionOffsetManager(partition, r.storage)
if err != nil {
return nil, err
}
r.managers[key] = manager
return manager, nil
}
@ -196,7 +212,7 @@ func (r *PartitionOffsetRegistry) AssignOffset(partition *schema_pb.Partition) (
if err != nil {
return 0, err
}
return manager.AssignOffset(), nil
}
@ -206,7 +222,7 @@ func (r *PartitionOffsetRegistry) AssignOffsets(partition *schema_pb.Partition,
if err != nil {
return 0, 0, err
}
baseOffset, lastOffset = manager.AssignOffsets(count)
return baseOffset, lastOffset, nil
}
@ -217,13 +233,13 @@ func (r *PartitionOffsetRegistry) GetHighWaterMark(partition *schema_pb.Partitio
if err != nil {
return 0, err
}
return manager.GetHighWaterMark(), nil
}
// partitionKey generates a unique key for a partition
func partitionKey(partition *schema_pb.Partition) string {
return fmt.Sprintf("ring:%d:range:%d-%d:time:%d",
return fmt.Sprintf("ring:%d:range:%d-%d:time:%d",
partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs)
}
@ -268,7 +284,7 @@ func (a *OffsetAssigner) AssignSingleOffset(partition *schema_pb.Partition) *Ass
if err != nil {
return &AssignmentResult{Error: err}
}
return &AssignmentResult{
Assignment: &OffsetAssignment{
Offset: offset,
@ -284,7 +300,7 @@ func (a *OffsetAssigner) AssignBatchOffsets(partition *schema_pb.Partition, coun
if err != nil {
return &AssignmentResult{Error: err}
}
return &AssignmentResult{
Batch: &BatchOffsetAssignment{
BaseOffset: baseOffset,

Loading…
Cancel
Save