Browse Source

fix samara

pull/7231/head
chrislu 2 months ago
parent
commit
ba48ea9c4c
  1. 4
      test/kafka/sarama_e2e_test.go
  2. 26
      weed/mq/kafka/protocol/fetch.go

4
test/kafka/sarama_e2e_test.go

@ -1,6 +1,7 @@
package kafka
import (
"context"
"fmt"
"testing"
"time"
@ -163,8 +164,9 @@ func TestSaramaConsumerGroup(t *testing.T) {
// Start consuming (this will test FindCoordinator, JoinGroup, SyncGroup workflow)
go func() {
ctx := context.Background()
for {
err := consumerGroup.Consume(nil, []string{topicName}, consumerHandler)
err := consumerGroup.Consume(ctx, []string{topicName}, consumerHandler)
if err != nil {
t.Logf("Consumer group error: %v", err)
return

26
weed/mq/kafka/protocol/fetch.go

@ -105,21 +105,13 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
// Records - get actual stored record batches
var recordBatch []byte
if ledger != nil && highWaterMark > partition.FetchOffset {
// Try to get the actual stored record batch first
if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists {
recordBatch = storedBatch
hexLen := 20
if len(storedBatch) < hexLen {
hexLen = len(storedBatch)
}
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n",
partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen])
} else {
fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset)
// Fallback to synthetic batch if no stored batch found
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
}
fmt.Printf("DEBUG: GetRecordBatch delegated to SeaweedMQ handler - topic:%s, partition:%d, offset:%d\n",
topic.Name, partition.PartitionID, partition.FetchOffset)
// Use synthetic record batch since we don't have stored batches yet
fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset)
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
} else {
fmt.Printf("DEBUG: No messages available - fetchOffset %d >= highWaterMark %d\n", partition.FetchOffset, highWaterMark)
recordBatch = []byte{} // No messages available
@ -455,7 +447,7 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset
// This starts after the CRC field (which comes after magic byte)
crcStartPos := crcPos + 4 // start after the CRC field
crcData := batch[crcStartPos:]
crc := crc32.ChecksumIEEE(crcData)
crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
return batch
@ -573,7 +565,7 @@ func (h *Handler) constructSimpleRecordBatch(fetchOffset, highWaterMark int64) [
// This starts after the CRC field (which comes after magic byte)
crcStartPos := crcPos + 4 // start after the CRC field
crcData := batch[crcStartPos:]
crc := crc32.ChecksumIEEE(crcData)
crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
return batch

Loading…
Cancel
Save