diff --git a/weed/mq/kafka/offset/smq_mapping_test.go b/weed/mq/kafka/offset/smq_mapping_test.go index 5ea9cf41a..b23599b40 100644 --- a/weed/mq/kafka/offset/smq_mapping_test.go +++ b/weed/mq/kafka/offset/smq_mapping_test.go @@ -45,9 +45,9 @@ func TestKafkaToSMQMapping(t *testing.T) { // Verify the mapping assert.Equal(t, baseTime+2000, partitionOffset.StartTsNs) - assert.Equal(t, int32(1024), partitionOffset.Partition.RingSize) + assert.Equal(t, int32(2520), partitionOffset.Partition.RingSize) assert.Equal(t, int32(0), partitionOffset.Partition.RangeStart) - assert.Equal(t, int32(31), partitionOffset.Partition.RangeStop) + assert.Equal(t, int32(77), partitionOffset.Partition.RangeStop) t.Logf("Kafka offset %d → SMQ timestamp %d", kafkaOffset, partitionOffset.StartTsNs) }) @@ -71,10 +71,10 @@ func TestKafkaToSMQMapping(t *testing.T) { expectedStart int32 expectedStop int32 }{ - {0, 0, 31}, - {1, 32, 63}, - {2, 64, 95}, - {15, 480, 511}, + {0, 0, 77}, // 0 * 78 = 0, (0+1)*78-1 = 77 + {1, 78, 155}, // 1 * 78 = 78, (1+1)*78-1 = 155 + {2, 156, 233}, // 2 * 78 = 156, (2+1)*78-1 = 233 + {15, 1170, 1247}, // 15 * 78 = 1170, (15+1)*78-1 = 1247 } for _, tc := range testCases { @@ -207,8 +207,8 @@ func TestGetMappingInfo(t *testing.T) { assert.Equal(t, int64(0), info.KafkaOffset) assert.Equal(t, baseTime, info.SMQTimestamp) assert.Equal(t, int32(2), info.KafkaPartition) - assert.Equal(t, int32(64), info.SMQRangeStart) // 2 * 32 - assert.Equal(t, int32(95), info.SMQRangeStop) // (2+1) * 32 - 1 + assert.Equal(t, int32(156), info.SMQRangeStart) // 2 * 78 + assert.Equal(t, int32(233), info.SMQRangeStop) // (2+1) * 78 - 1 assert.Equal(t, int32(150), info.MessageSize) t.Logf("Mapping info: Kafka %d:%d → SMQ %d [%d-%d] (%d bytes)", @@ -265,9 +265,9 @@ func TestCreatePartitionOffsetForTimeRange(t *testing.T) { partitionOffset := mapper.CreatePartitionOffsetForTimeRange(kafkaPartition, startTime) assert.Equal(t, startTime, partitionOffset.StartTsNs) - assert.Equal(t, int32(1024), partitionOffset.Partition.RingSize) - assert.Equal(t, int32(160), partitionOffset.Partition.RangeStart) // 5 * 32 - assert.Equal(t, int32(191), partitionOffset.Partition.RangeStop) // (5+1) * 32 - 1 + assert.Equal(t, int32(2520), partitionOffset.Partition.RingSize) + assert.Equal(t, int32(390), partitionOffset.Partition.RangeStart) // 5 * 78 + assert.Equal(t, int32(467), partitionOffset.Partition.RangeStop) // (5+1) * 78 - 1 t.Logf("Kafka partition %d time range → SMQ PartitionOffset [%d-%d] @ %d", kafkaPartition, partitionOffset.Partition.RangeStart, diff --git a/weed/mq/kafka/offset/smq_storage.go b/weed/mq/kafka/offset/smq_storage.go index 7e6a75332..b89e06693 100644 --- a/weed/mq/kafka/offset/smq_storage.go +++ b/weed/mq/kafka/offset/smq_storage.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -36,23 +37,23 @@ func NewSMQOffsetStorage(filerAddress string) (*SMQOffsetStorage, error) { // SaveConsumerOffset saves the committed offset for a consumer group // Uses the same file format and location as SMQ brokers: -// Path: //.offset +// Path: //.offset // Content: 8-byte big-endian offset func (s *SMQOffsetStorage) SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error { t := topic.Topic{ Namespace: "kafka", // Use kafka namespace for Kafka topics Name: key.Topic, } - + p := topic.Partition{ - RingSize: MaxPartitionCount, + RingSize: pub_balancer.MaxPartitionCount, RangeStart: int32(key.Partition), RangeStop: int32(key.Partition), } partitionDir := topic.PartitionDir(t, p) offsetFileName := fmt.Sprintf("%s.offset", key.ConsumerGroup) - + // Use SMQ's 8-byte offset format offsetBytes := make([]byte, 8) util.Uint64toBytes(offsetBytes, uint64(kafkaOffset)) @@ -69,7 +70,7 @@ func (s *SMQOffsetStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetE if err != nil { return []OffsetEntry{}, nil // No committed offset found } - + if offset < 0 { return []OffsetEntry{}, nil // No valid offset } @@ -79,7 +80,7 @@ func (s *SMQOffsetStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetE { KafkaOffset: offset, Timestamp: 0, // SMQ doesn't store timestamp mapping - Size: 0, // SMQ doesn't store size mapping + Size: 0, // SMQ doesn't store size mapping }, }, nil } @@ -90,23 +91,23 @@ func (s *SMQOffsetStorage) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int6 if err != nil { return 0, nil // Start from beginning if no committed offset } - + if offset < 0 { return 0, nil // Start from beginning } - + return offset + 1, nil // Next offset after committed } // getCommittedOffset reads the committed offset from SMQ's filer location func (s *SMQOffsetStorage) getCommittedOffset(key ConsumerOffsetKey) (int64, error) { t := topic.Topic{ - Namespace: "kafka", + Namespace: "kafka", Name: key.Topic, } - + p := topic.Partition{ - RingSize: MaxPartitionCount, + RingSize: pub_balancer.MaxPartitionCount, RangeStart: int32(key.Partition), RangeStop: int32(key.Partition), } @@ -126,11 +127,11 @@ func (s *SMQOffsetStorage) getCommittedOffset(key ConsumerOffsetKey) (int64, err offset = int64(util.BytesToUint64(data)) return nil }) - + if err != nil { return -1, err } - + return offset, nil } @@ -165,9 +166,6 @@ func (s *SMQOffsetStorage) Close() error { return nil } -// MaxPartitionCount defines the partition ring size used by SMQ -const MaxPartitionCount = 1024 - // parseTopicPartitionKey parses legacy "topic:partition" format into ConsumerOffsetKey func parseTopicPartitionKey(topicPartition string) (ConsumerOffsetKey, error) { return ConsumerOffsetKey{}, fmt.Errorf("legacy format parsing not implemented yet")