You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
351 lines
9.5 KiB
351 lines
9.5 KiB
package broker
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
func createTestTopic() topic.Topic {
|
|
return topic.Topic{
|
|
Namespace: "test",
|
|
Name: "offset-test",
|
|
}
|
|
}
|
|
|
|
func createTestPartition() topic.Partition {
|
|
return topic.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_AssignOffset(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Test sequential offset assignment
|
|
for i := int64(0); i < 10; i++ {
|
|
assignedOffset, err := manager.AssignOffset(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign offset %d: %v", i, err)
|
|
}
|
|
|
|
if assignedOffset != i {
|
|
t.Errorf("Expected offset %d, got %d", i, assignedOffset)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_AssignBatchOffsets(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Assign batch of offsets
|
|
baseOffset, lastOffset, err := manager.AssignBatchOffsets(testTopic, testPartition, 5)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign batch offsets: %v", err)
|
|
}
|
|
|
|
if baseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", baseOffset)
|
|
}
|
|
|
|
if lastOffset != 4 {
|
|
t.Errorf("Expected last offset 4, got %d", lastOffset)
|
|
}
|
|
|
|
// Assign another batch
|
|
baseOffset2, lastOffset2, err := manager.AssignBatchOffsets(testTopic, testPartition, 3)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign second batch offsets: %v", err)
|
|
}
|
|
|
|
if baseOffset2 != 5 {
|
|
t.Errorf("Expected base offset 5, got %d", baseOffset2)
|
|
}
|
|
|
|
if lastOffset2 != 7 {
|
|
t.Errorf("Expected last offset 7, got %d", lastOffset2)
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_GetHighWaterMark(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Initially should be 0
|
|
hwm, err := manager.GetHighWaterMark(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get initial high water mark: %v", err)
|
|
}
|
|
|
|
if hwm != 0 {
|
|
t.Errorf("Expected initial high water mark 0, got %d", hwm)
|
|
}
|
|
|
|
// Assign some offsets
|
|
manager.AssignBatchOffsets(testTopic, testPartition, 10)
|
|
|
|
// High water mark should be updated
|
|
hwm, err = manager.GetHighWaterMark(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get high water mark after assignment: %v", err)
|
|
}
|
|
|
|
if hwm != 10 {
|
|
t.Errorf("Expected high water mark 10, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_CreateSubscription(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Assign some offsets first
|
|
manager.AssignBatchOffsets(testTopic, testPartition, 5)
|
|
|
|
// Create subscription
|
|
sub, err := manager.CreateSubscription(
|
|
"test-sub",
|
|
testTopic,
|
|
testPartition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
if sub.ID != "test-sub" {
|
|
t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
|
|
}
|
|
|
|
if sub.StartOffset != 0 {
|
|
t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_GetPartitionOffsetInfo(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Test empty partition
|
|
info, err := manager.GetPartitionOffsetInfo(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get partition offset info: %v", err)
|
|
}
|
|
|
|
if info.EarliestOffset != 0 {
|
|
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
|
|
}
|
|
|
|
if info.LatestOffset != -1 {
|
|
t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
|
|
}
|
|
|
|
// Assign offsets and test again
|
|
manager.AssignBatchOffsets(testTopic, testPartition, 5)
|
|
|
|
info, err = manager.GetPartitionOffsetInfo(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get partition offset info after assignment: %v", err)
|
|
}
|
|
|
|
if info.LatestOffset != 4 {
|
|
t.Errorf("Expected latest offset 4, got %d", info.LatestOffset)
|
|
}
|
|
|
|
if info.HighWaterMark != 5 {
|
|
t.Errorf("Expected high water mark 5, got %d", info.HighWaterMark)
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_MultiplePartitions(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
|
|
// Create different partitions
|
|
partition1 := topic.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
|
|
partition2 := topic.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 32,
|
|
RangeStop: 63,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
|
|
// Assign offsets to different partitions
|
|
assignedOffset1, err := manager.AssignOffset(testTopic, partition1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign offset to partition1: %v", err)
|
|
}
|
|
|
|
assignedOffset2, err := manager.AssignOffset(testTopic, partition2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign offset to partition2: %v", err)
|
|
}
|
|
|
|
// Both should start at 0
|
|
if assignedOffset1 != 0 {
|
|
t.Errorf("Expected offset 0 for partition1, got %d", assignedOffset1)
|
|
}
|
|
|
|
if assignedOffset2 != 0 {
|
|
t.Errorf("Expected offset 0 for partition2, got %d", assignedOffset2)
|
|
}
|
|
|
|
// Assign more offsets to partition1
|
|
assignedOffset1_2, err := manager.AssignOffset(testTopic, partition1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign second offset to partition1: %v", err)
|
|
}
|
|
|
|
if assignedOffset1_2 != 1 {
|
|
t.Errorf("Expected offset 1 for partition1, got %d", assignedOffset1_2)
|
|
}
|
|
|
|
// Partition2 should still be at 0 for next assignment
|
|
assignedOffset2_2, err := manager.AssignOffset(testTopic, partition2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign second offset to partition2: %v", err)
|
|
}
|
|
|
|
if assignedOffset2_2 != 1 {
|
|
t.Errorf("Expected offset 1 for partition2, got %d", assignedOffset2_2)
|
|
}
|
|
}
|
|
|
|
func TestOffsetAwarePublisher(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Create a mock local partition (simplified for testing)
|
|
localPartition := &topic.LocalPartition{}
|
|
|
|
// Create offset assignment function
|
|
assignOffsetFn := func() (int64, error) {
|
|
return manager.AssignOffset(testTopic, testPartition)
|
|
}
|
|
|
|
// Create offset-aware publisher
|
|
publisher := topic.NewOffsetAwarePublisher(localPartition, assignOffsetFn)
|
|
|
|
if publisher.GetPartition() != localPartition {
|
|
t.Error("Publisher should return the correct partition")
|
|
}
|
|
|
|
// Test would require more setup to actually publish messages
|
|
// This tests the basic structure
|
|
}
|
|
|
|
func TestBrokerOffsetManager_GetOffsetMetrics(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Initial metrics
|
|
metrics := manager.GetOffsetMetrics()
|
|
if metrics.TotalOffsets != 0 {
|
|
t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
|
|
}
|
|
|
|
// Assign some offsets
|
|
manager.AssignBatchOffsets(testTopic, testPartition, 5)
|
|
|
|
// Create subscription
|
|
manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
|
|
|
|
// Check updated metrics
|
|
metrics = manager.GetOffsetMetrics()
|
|
if metrics.PartitionCount != 1 {
|
|
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_AssignOffsetsWithResult(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Assign offsets with result
|
|
result := manager.AssignOffsetsWithResult(testTopic, testPartition, 3)
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no error, got: %v", result.Error)
|
|
}
|
|
|
|
if result.BaseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", result.BaseOffset)
|
|
}
|
|
|
|
if result.LastOffset != 2 {
|
|
t.Errorf("Expected last offset 2, got %d", result.LastOffset)
|
|
}
|
|
|
|
if result.Count != 3 {
|
|
t.Errorf("Expected count 3, got %d", result.Count)
|
|
}
|
|
|
|
if result.Topic != testTopic {
|
|
t.Error("Topic mismatch in result")
|
|
}
|
|
|
|
if result.Partition != testPartition {
|
|
t.Error("Partition mismatch in result")
|
|
}
|
|
|
|
if result.Timestamp <= 0 {
|
|
t.Error("Timestamp should be set")
|
|
}
|
|
}
|
|
|
|
func TestBrokerOffsetManager_Shutdown(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorageForTesting()
|
|
manager := NewBrokerOffsetManagerWithStorage(storage)
|
|
testTopic := createTestTopic()
|
|
testPartition := createTestPartition()
|
|
|
|
// Assign some offsets and create subscriptions
|
|
manager.AssignBatchOffsets(testTopic, testPartition, 5)
|
|
manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
|
|
|
|
// Shutdown should not panic
|
|
manager.Shutdown()
|
|
|
|
// After shutdown, operations should still work (using new managers)
|
|
offset, err := manager.AssignOffset(testTopic, testPartition)
|
|
if err != nil {
|
|
t.Fatalf("Operations should still work after shutdown: %v", err)
|
|
}
|
|
|
|
// Should start from 0 again (new manager)
|
|
if offset != 0 {
|
|
t.Errorf("Expected offset 0 after shutdown, got %d", offset)
|
|
}
|
|
}
|