You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
589 lines
16 KiB
589 lines
16 KiB
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
|
|
)
|
|
|
|
func TestHandler_handleOffsetCommit(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Create a consumer group with a stable member
|
|
group := h.groupCoordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = consumer.GroupStateStable
|
|
group.Generation = 1
|
|
group.Members["member1"] = &consumer.GroupMember{
|
|
ID: "member1",
|
|
State: consumer.MemberStateStable,
|
|
Assignment: []consumer.PartitionAssignment{
|
|
{Topic: "test-topic", Partition: 0},
|
|
},
|
|
}
|
|
group.Mu.Unlock()
|
|
|
|
// Create a basic offset commit request
|
|
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
|
|
|
|
correlationID := uint32(123)
|
|
response, err := h.handleOffsetCommit(correlationID, requestBody)
|
|
|
|
if err != nil {
|
|
t.Fatalf("handleOffsetCommit failed: %v", err)
|
|
}
|
|
|
|
if len(response) < 8 {
|
|
t.Fatalf("response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Check correlation ID in response
|
|
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
|
|
if respCorrelationID != correlationID {
|
|
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
|
|
}
|
|
|
|
// Verify offset was committed
|
|
group.Mu.RLock()
|
|
if group.OffsetCommits == nil || group.OffsetCommits["test-topic"] == nil {
|
|
t.Error("offset commit was not stored")
|
|
} else {
|
|
commit, exists := group.OffsetCommits["test-topic"][0]
|
|
if !exists {
|
|
t.Error("offset commit for partition 0 was not stored")
|
|
} else if commit.Offset != 0 {
|
|
t.Errorf("expected offset 0, got %d", commit.Offset)
|
|
}
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestHandler_handleOffsetCommit_InvalidGroup(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Request for non-existent group
|
|
requestBody := createOffsetCommitRequestBody("nonexistent-group", 1, "member1")
|
|
|
|
correlationID := uint32(124)
|
|
response, err := h.handleOffsetCommit(correlationID, requestBody)
|
|
|
|
if err != nil {
|
|
t.Fatalf("handleOffsetCommit failed: %v", err)
|
|
}
|
|
|
|
// Should get error response
|
|
if len(response) < 8 {
|
|
t.Fatalf("error response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Response should have correlation ID
|
|
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
|
|
if respCorrelationID != correlationID {
|
|
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
|
|
}
|
|
}
|
|
|
|
func TestHandler_handleOffsetCommit_WrongGeneration(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Create a consumer group with generation 2
|
|
group := h.groupCoordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = consumer.GroupStateStable
|
|
group.Generation = 2
|
|
group.Members["member1"] = &consumer.GroupMember{
|
|
ID: "member1",
|
|
State: consumer.MemberStateStable,
|
|
Assignment: []consumer.PartitionAssignment{
|
|
{Topic: "test-topic", Partition: 0},
|
|
},
|
|
}
|
|
group.Mu.Unlock()
|
|
|
|
// Request with wrong generation (1 instead of 2)
|
|
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
|
|
|
|
correlationID := uint32(125)
|
|
response, err := h.handleOffsetCommit(correlationID, requestBody)
|
|
|
|
if err != nil {
|
|
t.Fatalf("handleOffsetCommit failed: %v", err)
|
|
}
|
|
|
|
if len(response) < 8 {
|
|
t.Fatalf("response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Verify no offset was committed due to generation mismatch
|
|
group.Mu.RLock()
|
|
if group.OffsetCommits != nil && group.OffsetCommits["test-topic"] != nil {
|
|
if _, exists := group.OffsetCommits["test-topic"][0]; exists {
|
|
t.Error("offset should not have been committed with wrong generation")
|
|
}
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestHandler_handleOffsetFetch(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Create a consumer group with committed offsets
|
|
group := h.groupCoordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = consumer.GroupStateStable
|
|
group.Generation = 1
|
|
|
|
// Pre-populate with committed offset
|
|
group.OffsetCommits = map[string]map[int32]consumer.OffsetCommit{
|
|
"test-topic": {
|
|
0: {
|
|
Offset: 42,
|
|
Metadata: "test-metadata",
|
|
Timestamp: time.Now(),
|
|
},
|
|
},
|
|
}
|
|
group.Mu.Unlock()
|
|
|
|
// Create a basic offset fetch request
|
|
requestBody := createOffsetFetchRequestBody("test-group")
|
|
|
|
correlationID := uint32(126)
|
|
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
|
|
|
|
if err != nil {
|
|
t.Fatalf("handleOffsetFetch failed: %v", err)
|
|
}
|
|
|
|
if len(response) < 8 {
|
|
t.Fatalf("response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Check correlation ID in response
|
|
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
|
|
if respCorrelationID != correlationID {
|
|
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
|
|
}
|
|
}
|
|
|
|
func TestHandler_handleOffsetFetch_NoCommittedOffset(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Create a consumer group without committed offsets
|
|
group := h.groupCoordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = consumer.GroupStateStable
|
|
group.Generation = 1
|
|
// No offset commits
|
|
group.Mu.Unlock()
|
|
|
|
requestBody := createOffsetFetchRequestBody("test-group")
|
|
|
|
correlationID := uint32(127)
|
|
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
|
|
|
|
if err != nil {
|
|
t.Fatalf("handleOffsetFetch failed: %v", err)
|
|
}
|
|
|
|
if len(response) < 8 {
|
|
t.Fatalf("response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Should get valid response even with no committed offsets
|
|
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
|
|
if respCorrelationID != correlationID {
|
|
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
|
|
}
|
|
}
|
|
|
|
func TestHandler_commitOffset(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
group := &consumer.ConsumerGroup{
|
|
ID: "test-group",
|
|
OffsetCommits: nil,
|
|
}
|
|
|
|
// Test committing an offset
|
|
err := h.commitOffset(group, "test-topic", 0, 100, "test-metadata")
|
|
if err != nil {
|
|
t.Fatalf("commitOffset failed: %v", err)
|
|
}
|
|
|
|
// Verify offset was stored
|
|
if group.OffsetCommits == nil {
|
|
t.Fatal("OffsetCommits map was not initialized")
|
|
}
|
|
|
|
topicOffsets, exists := group.OffsetCommits["test-topic"]
|
|
if !exists {
|
|
t.Fatal("topic offsets not found")
|
|
}
|
|
|
|
commit, exists := topicOffsets[0]
|
|
if !exists {
|
|
t.Fatal("partition offset not found")
|
|
}
|
|
|
|
if commit.Offset != 100 {
|
|
t.Errorf("expected offset 100, got %d", commit.Offset)
|
|
}
|
|
|
|
if commit.Metadata != "test-metadata" {
|
|
t.Errorf("expected metadata 'test-metadata', got '%s'", commit.Metadata)
|
|
}
|
|
|
|
// Test updating existing offset
|
|
err = h.commitOffset(group, "test-topic", 0, 200, "updated-metadata")
|
|
if err != nil {
|
|
t.Fatalf("commitOffset update failed: %v", err)
|
|
}
|
|
|
|
updatedCommit := group.OffsetCommits["test-topic"][0]
|
|
if updatedCommit.Offset != 200 {
|
|
t.Errorf("expected updated offset 200, got %d", updatedCommit.Offset)
|
|
}
|
|
|
|
if updatedCommit.Metadata != "updated-metadata" {
|
|
t.Errorf("expected updated metadata 'updated-metadata', got '%s'", updatedCommit.Metadata)
|
|
}
|
|
}
|
|
|
|
func TestHandler_fetchOffset(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
// Test fetching from empty group
|
|
emptyGroup := &consumer.ConsumerGroup{
|
|
ID: "empty-group",
|
|
OffsetCommits: nil,
|
|
}
|
|
|
|
offset, metadata, err := h.fetchOffset(emptyGroup, "test-topic", 0)
|
|
if err != nil {
|
|
t.Errorf("fetchOffset should not error on empty group: %v", err)
|
|
}
|
|
|
|
if offset != -1 {
|
|
t.Errorf("expected offset -1 for empty group, got %d", offset)
|
|
}
|
|
|
|
if metadata != "" {
|
|
t.Errorf("expected empty metadata for empty group, got '%s'", metadata)
|
|
}
|
|
|
|
// Test fetching from group with committed offsets
|
|
group := &consumer.ConsumerGroup{
|
|
ID: "test-group",
|
|
OffsetCommits: map[string]map[int32]consumer.OffsetCommit{
|
|
"test-topic": {
|
|
0: {
|
|
Offset: 42,
|
|
Metadata: "test-metadata",
|
|
Timestamp: time.Now(),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
offset, metadata, err = h.fetchOffset(group, "test-topic", 0)
|
|
if err != nil {
|
|
t.Errorf("fetchOffset failed: %v", err)
|
|
}
|
|
|
|
if offset != 42 {
|
|
t.Errorf("expected offset 42, got %d", offset)
|
|
}
|
|
|
|
if metadata != "test-metadata" {
|
|
t.Errorf("expected metadata 'test-metadata', got '%s'", metadata)
|
|
}
|
|
|
|
// Test fetching non-existent partition
|
|
offset, metadata, err = h.fetchOffset(group, "test-topic", 1)
|
|
if err != nil {
|
|
t.Errorf("fetchOffset should not error on non-existent partition: %v", err)
|
|
}
|
|
|
|
if offset != -1 {
|
|
t.Errorf("expected offset -1 for non-existent partition, got %d", offset)
|
|
}
|
|
|
|
// Test fetching non-existent topic
|
|
offset, metadata, err = h.fetchOffset(group, "nonexistent-topic", 0)
|
|
if err != nil {
|
|
t.Errorf("fetchOffset should not error on non-existent topic: %v", err)
|
|
}
|
|
|
|
if offset != -1 {
|
|
t.Errorf("expected offset -1 for non-existent topic, got %d", offset)
|
|
}
|
|
}
|
|
|
|
func TestHandler_OffsetCommitFetch_EndToEnd(t *testing.T) {
|
|
// Create two handlers connected via pipe to simulate client-server
|
|
server := NewTestHandler()
|
|
defer server.Close()
|
|
|
|
client := NewTestHandler()
|
|
defer client.Close()
|
|
|
|
serverConn, clientConn := net.Pipe()
|
|
defer serverConn.Close()
|
|
defer clientConn.Close()
|
|
|
|
// Setup consumer group on server
|
|
group := server.groupCoordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = consumer.GroupStateStable
|
|
group.Generation = 1
|
|
group.Members["member1"] = &consumer.GroupMember{
|
|
ID: "member1",
|
|
State: consumer.MemberStateStable,
|
|
Assignment: []consumer.PartitionAssignment{
|
|
{Topic: "test-topic", Partition: 0},
|
|
},
|
|
}
|
|
group.Mu.Unlock()
|
|
|
|
// Test offset commit
|
|
commitRequestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
|
|
commitResponse, err := server.handleOffsetCommit(456, commitRequestBody)
|
|
if err != nil {
|
|
t.Fatalf("offset commit failed: %v", err)
|
|
}
|
|
|
|
if len(commitResponse) < 8 {
|
|
t.Fatalf("commit response too short: %d bytes", len(commitResponse))
|
|
}
|
|
|
|
// Test offset fetch
|
|
fetchRequestBody := createOffsetFetchRequestBody("test-group")
|
|
fetchResponse, err := server.handleOffsetFetch(457, 2, fetchRequestBody)
|
|
if err != nil {
|
|
t.Fatalf("offset fetch failed: %v", err)
|
|
}
|
|
|
|
if len(fetchResponse) < 8 {
|
|
t.Fatalf("fetch response too short: %d bytes", len(fetchResponse))
|
|
}
|
|
|
|
// Verify the committed offset is present
|
|
group.Mu.RLock()
|
|
if group.OffsetCommits == nil || group.OffsetCommits["test-topic"] == nil {
|
|
t.Error("offset commit was not stored")
|
|
} else {
|
|
commit, exists := group.OffsetCommits["test-topic"][0]
|
|
if !exists {
|
|
t.Error("offset commit for partition 0 was not found")
|
|
} else if commit.Offset != 0 {
|
|
t.Errorf("expected committed offset 0, got %d", commit.Offset)
|
|
}
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestHandler_parseOffsetCommitRequest(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
|
|
|
|
request, err := h.parseOffsetCommitRequest(requestBody)
|
|
if err != nil {
|
|
t.Fatalf("parseOffsetCommitRequest failed: %v", err)
|
|
}
|
|
|
|
if request.GroupID != "test-group" {
|
|
t.Errorf("expected group ID 'test-group', got '%s'", request.GroupID)
|
|
}
|
|
|
|
if request.GenerationID != 1 {
|
|
t.Errorf("expected generation ID 1, got %d", request.GenerationID)
|
|
}
|
|
|
|
if request.MemberID != "member1" {
|
|
t.Errorf("expected member ID 'member1', got '%s'", request.MemberID)
|
|
}
|
|
}
|
|
|
|
func TestHandler_parseOffsetFetchRequest(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
requestBody := createOffsetFetchRequestBody("test-group")
|
|
|
|
request, err := h.parseOffsetFetchRequest(requestBody)
|
|
if err != nil {
|
|
t.Fatalf("parseOffsetFetchRequest failed: %v", err)
|
|
}
|
|
|
|
if request.GroupID != "test-group" {
|
|
t.Errorf("expected group ID 'test-group', got '%s'", request.GroupID)
|
|
}
|
|
|
|
if len(request.Topics) == 0 {
|
|
t.Error("expected at least one topic in request")
|
|
} else {
|
|
if request.Topics[0].Name != "test-topic" {
|
|
t.Errorf("expected topic name 'test-topic', got '%s'", request.Topics[0].Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestHandler_buildOffsetCommitResponse(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
response := OffsetCommitResponse{
|
|
CorrelationID: 123,
|
|
Topics: []OffsetCommitTopicResponse{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []OffsetCommitPartitionResponse{
|
|
{Index: 0, ErrorCode: ErrorCodeNone},
|
|
{Index: 1, ErrorCode: ErrorCodeOffsetMetadataTooLarge},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
responseBytes := h.buildOffsetCommitResponse(response)
|
|
|
|
if len(responseBytes) < 16 {
|
|
t.Fatalf("response too short: %d bytes", len(responseBytes))
|
|
}
|
|
|
|
// Check correlation ID
|
|
correlationID := binary.BigEndian.Uint32(responseBytes[0:4])
|
|
if correlationID != 123 {
|
|
t.Errorf("expected correlation ID 123, got %d", correlationID)
|
|
}
|
|
}
|
|
|
|
func TestHandler_buildOffsetFetchResponse(t *testing.T) {
|
|
h := NewTestHandler()
|
|
defer h.Close()
|
|
|
|
response := OffsetFetchResponse{
|
|
CorrelationID: 124,
|
|
Topics: []OffsetFetchTopicResponse{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []OffsetFetchPartitionResponse{
|
|
{
|
|
Index: 0,
|
|
Offset: 42,
|
|
LeaderEpoch: -1,
|
|
Metadata: "test-metadata",
|
|
ErrorCode: ErrorCodeNone,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
ErrorCode: ErrorCodeNone,
|
|
}
|
|
|
|
responseBytes := h.buildOffsetFetchResponse(response, 5) // Use API version 5 (includes leader epoch)
|
|
|
|
if len(responseBytes) < 20 {
|
|
t.Fatalf("response too short: %d bytes", len(responseBytes))
|
|
}
|
|
|
|
// Check correlation ID
|
|
correlationID := binary.BigEndian.Uint32(responseBytes[0:4])
|
|
if correlationID != 124 {
|
|
t.Errorf("expected correlation ID 124, got %d", correlationID)
|
|
}
|
|
}
|
|
|
|
// Helper functions for creating test request bodies
|
|
|
|
func createOffsetCommitRequestBody(groupID string, generationID int32, memberID string) []byte {
|
|
body := make([]byte, 0, 128)
|
|
|
|
// Group ID (string)
|
|
groupIDBytes := []byte(groupID)
|
|
groupIDLength := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(groupIDLength, uint16(len(groupIDBytes)))
|
|
body = append(body, groupIDLength...)
|
|
body = append(body, groupIDBytes...)
|
|
|
|
// Generation ID (4 bytes)
|
|
generationIDBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(generationIDBytes, uint32(generationID))
|
|
body = append(body, generationIDBytes...)
|
|
|
|
// Member ID (string)
|
|
memberIDBytes := []byte(memberID)
|
|
memberIDLength := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(memberIDLength, uint16(len(memberIDBytes)))
|
|
body = append(body, memberIDLength...)
|
|
body = append(body, memberIDBytes...)
|
|
|
|
// RetentionTime (8 bytes)
|
|
body = append(body, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
|
|
|
|
// Topics count (1)
|
|
body = append(body, 0, 0, 0, 1)
|
|
|
|
// Topic name: "test-topic"
|
|
topic := "test-topic"
|
|
topicBytes := []byte(topic)
|
|
topicLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(topicLen, uint16(len(topicBytes)))
|
|
body = append(body, topicLen...)
|
|
body = append(body, topicBytes...)
|
|
|
|
// Partitions count (1)
|
|
body = append(body, 0, 0, 0, 1)
|
|
|
|
// Partition 0 fields: index(4) + offset(8) + leader_epoch(4) + metadata(NULLABLE STRING)
|
|
body = append(body, 0, 0, 0, 0) // partition index 0
|
|
body = append(body, 0, 0, 0, 0, 0, 0, 0, 0) // offset 0
|
|
body = append(body, 0xFF, 0xFF, 0xFF, 0xFF) // leader epoch -1
|
|
// metadata: null (-1)
|
|
body = append(body, 0xFF, 0xFF)
|
|
|
|
return body
|
|
}
|
|
|
|
func createOffsetFetchRequestBody(groupID string) []byte {
|
|
body := make([]byte, 0, 64)
|
|
|
|
// Group ID (string)
|
|
groupIDBytes := []byte(groupID)
|
|
groupIDLength := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(groupIDLength, uint16(len(groupIDBytes)))
|
|
body = append(body, groupIDLength...)
|
|
body = append(body, groupIDBytes...)
|
|
|
|
// Topics count (1)
|
|
body = append(body, 0, 0, 0, 1)
|
|
|
|
// Topic name: "test-topic"
|
|
topic := "test-topic"
|
|
topicBytes := []byte(topic)
|
|
topicLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(topicLen, uint16(len(topicBytes)))
|
|
body = append(body, topicLen...)
|
|
body = append(body, topicBytes...)
|
|
|
|
// Partitions count (1)
|
|
body = append(body, 0, 0, 0, 1)
|
|
|
|
// Partition 0 index
|
|
body = append(body, 0, 0, 0, 0)
|
|
|
|
return body
|
|
}
|