You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
4.0 KiB
160 lines
4.0 KiB
package protocol
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
)
|
|
|
|
func TestOffsetCommitHandlerIntegration(t *testing.T) {
|
|
// Test that the offset commit handler properly uses SMQ storage
|
|
|
|
// Test ConsumerOffsetKey creation
|
|
key := offset.ConsumerOffsetKey{
|
|
Topic: "test-topic",
|
|
Partition: 0,
|
|
ConsumerGroup: "test-group",
|
|
ConsumerGroupInstance: "test-instance",
|
|
}
|
|
|
|
if key.Topic != "test-topic" {
|
|
t.Errorf("Expected topic 'test-topic', got %s", key.Topic)
|
|
}
|
|
|
|
if key.Partition != 0 {
|
|
t.Errorf("Expected partition 0, got %d", key.Partition)
|
|
}
|
|
|
|
if key.ConsumerGroup != "test-group" {
|
|
t.Errorf("Expected consumer group 'test-group', got %s", key.ConsumerGroup)
|
|
}
|
|
|
|
if key.ConsumerGroupInstance != "test-instance" {
|
|
t.Errorf("Expected instance 'test-instance', got %s", key.ConsumerGroupInstance)
|
|
}
|
|
}
|
|
|
|
func TestOffsetCommitToSMQ_WithoutStorage(t *testing.T) {
|
|
// Test error handling when SMQ storage is not initialized
|
|
handler := &Handler{
|
|
smqOffsetStorage: nil, // Not initialized
|
|
}
|
|
|
|
key := offset.ConsumerOffsetKey{
|
|
Topic: "test-topic",
|
|
Partition: 0,
|
|
ConsumerGroup: "test-group",
|
|
}
|
|
|
|
err := handler.commitOffsetToSMQ(key, 100, "test-metadata")
|
|
if err == nil {
|
|
t.Error("Expected error when SMQ storage not initialized, got nil")
|
|
}
|
|
|
|
expectedError := "SMQ offset storage not initialized"
|
|
if err.Error() != expectedError {
|
|
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
|
|
}
|
|
}
|
|
|
|
func TestFetchOffsetFromSMQ_WithoutStorage(t *testing.T) {
|
|
// Test error handling when SMQ storage is not initialized
|
|
handler := &Handler{
|
|
smqOffsetStorage: nil, // Not initialized
|
|
}
|
|
|
|
key := offset.ConsumerOffsetKey{
|
|
Topic: "test-topic",
|
|
Partition: 0,
|
|
ConsumerGroup: "test-group",
|
|
}
|
|
|
|
offset, metadata, err := handler.fetchOffsetFromSMQ(key)
|
|
if err == nil {
|
|
t.Error("Expected error when SMQ storage not initialized, got nil")
|
|
}
|
|
|
|
if offset != -1 {
|
|
t.Errorf("Expected offset -1, got %d", offset)
|
|
}
|
|
|
|
if metadata != "" {
|
|
t.Errorf("Expected empty metadata, got '%s'", metadata)
|
|
}
|
|
}
|
|
|
|
func TestOffsetHandlers_StructureValidation(t *testing.T) {
|
|
// Validate that offset commit/fetch request/response structures are properly formed
|
|
|
|
// Test OffsetCommitRequest structure
|
|
request := OffsetCommitRequest{
|
|
GroupID: "test-group",
|
|
GenerationID: 1,
|
|
MemberID: "test-member",
|
|
GroupInstanceID: "test-instance",
|
|
RetentionTime: -1,
|
|
Topics: []OffsetCommitTopic{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []OffsetCommitPartition{
|
|
{
|
|
Index: 0,
|
|
Offset: 100,
|
|
LeaderEpoch: -1,
|
|
Metadata: "test-metadata",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if request.GroupID != "test-group" {
|
|
t.Errorf("Expected group ID 'test-group', got %s", request.GroupID)
|
|
}
|
|
|
|
if len(request.Topics) != 1 {
|
|
t.Errorf("Expected 1 topic, got %d", len(request.Topics))
|
|
}
|
|
|
|
if request.Topics[0].Name != "test-topic" {
|
|
t.Errorf("Expected topic 'test-topic', got %s", request.Topics[0].Name)
|
|
}
|
|
|
|
if len(request.Topics[0].Partitions) != 1 {
|
|
t.Errorf("Expected 1 partition, got %d", len(request.Topics[0].Partitions))
|
|
}
|
|
|
|
partition := request.Topics[0].Partitions[0]
|
|
if partition.Index != 0 {
|
|
t.Errorf("Expected partition index 0, got %d", partition.Index)
|
|
}
|
|
|
|
if partition.Offset != 100 {
|
|
t.Errorf("Expected offset 100, got %d", partition.Offset)
|
|
}
|
|
|
|
// Test OffsetFetchRequest structure
|
|
fetchRequest := OffsetFetchRequest{
|
|
GroupID: "test-group",
|
|
GroupInstanceID: "test-instance",
|
|
Topics: []OffsetFetchTopic{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []int32{0, 1, 2},
|
|
},
|
|
},
|
|
RequireStable: false,
|
|
}
|
|
|
|
if fetchRequest.GroupID != "test-group" {
|
|
t.Errorf("Expected group ID 'test-group', got %s", fetchRequest.GroupID)
|
|
}
|
|
|
|
if len(fetchRequest.Topics) != 1 {
|
|
t.Errorf("Expected 1 topic, got %d", len(fetchRequest.Topics))
|
|
}
|
|
|
|
if len(fetchRequest.Topics[0].Partitions) != 3 {
|
|
t.Errorf("Expected 3 partitions, got %d", len(fetchRequest.Topics[0].Partitions))
|
|
}
|
|
}
|