You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

454 lines
11 KiB

package offset
import (
"fmt"
"os"
"testing"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// BenchmarkOffsetAssignment benchmarks sequential offset assignment
func BenchmarkOffsetAssignment(b *testing.B) {
storage := NewInMemoryOffsetStorage()
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
if err != nil {
b.Fatalf("Failed to create partition manager: %v", err)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
manager.AssignOffset()
}
})
}
// BenchmarkBatchOffsetAssignment benchmarks batch offset assignment
func BenchmarkBatchOffsetAssignment(b *testing.B) {
storage := NewInMemoryOffsetStorage()
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
if err != nil {
b.Fatalf("Failed to create partition manager: %v", err)
}
batchSizes := []int64{1, 10, 100, 1000}
for _, batchSize := range batchSizes {
b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.AssignOffsets(batchSize)
}
})
}
}
// BenchmarkSQLOffsetStorage benchmarks SQL storage operations
func BenchmarkSQLOffsetStorage(b *testing.B) {
// Create temporary database
tmpFile, err := os.CreateTemp("", "benchmark_*.db")
if err != nil {
b.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
b.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
storage, err := NewSQLOffsetStorage(db)
if err != nil {
b.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
partitionKey := partitionKey(partition)
b.Run("SaveCheckpoint", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.SaveCheckpoint("test-namespace", "test-topic", partition, int64(i))
}
})
b.Run("LoadCheckpoint", func(b *testing.B) {
storage.SaveCheckpoint("test-namespace", "test-topic", partition, 1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.LoadCheckpoint("test-namespace", "test-topic", partition)
}
})
b.Run("SaveOffsetMapping", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100)
}
})
// Pre-populate for read benchmarks
for i := 0; i < 1000; i++ {
storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100)
}
b.Run("GetHighestOffset", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.GetHighestOffset("test-namespace", "test-topic", partition)
}
})
b.Run("LoadOffsetMappings", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.LoadOffsetMappings(partitionKey)
}
})
b.Run("GetOffsetMappingsByRange", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := int64(i % 900)
end := start + 100
storage.GetOffsetMappingsByRange(partitionKey, start, end)
}
})
b.Run("GetPartitionStats", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.GetPartitionStats(partitionKey)
}
})
}
// BenchmarkInMemoryVsSQL compares in-memory and SQL storage performance
func BenchmarkInMemoryVsSQL(b *testing.B) {
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
// In-memory storage benchmark
b.Run("InMemory", func(b *testing.B) {
storage := NewInMemoryOffsetStorage()
manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
if err != nil {
b.Fatalf("Failed to create partition manager: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.AssignOffset()
}
})
// SQL storage benchmark
b.Run("SQL", func(b *testing.B) {
tmpFile, err := os.CreateTemp("", "benchmark_sql_*.db")
if err != nil {
b.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
b.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
storage, err := NewSQLOffsetStorage(db)
if err != nil {
b.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
if err != nil {
b.Fatalf("Failed to create partition manager: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.AssignOffset()
}
})
}
// BenchmarkOffsetSubscription benchmarks subscription operations
func BenchmarkOffsetSubscription(b *testing.B) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
// Pre-assign offsets
registry.AssignOffsets("test-namespace", "test-topic", partition, 10000)
b.Run("CreateSubscription", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
subscriptionID := fmt.Sprintf("bench-sub-%d", i)
sub, err := subscriber.CreateSubscription(
subscriptionID,
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
b.Fatalf("Failed to create subscription: %v", err)
}
subscriber.CloseSubscription(subscriptionID)
_ = sub
}
})
// Create subscription for other benchmarks
sub, err := subscriber.CreateSubscription(
"bench-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
b.Fatalf("Failed to create subscription: %v", err)
}
b.Run("GetOffsetRange", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
sub.GetOffsetRange(100)
}
})
b.Run("AdvanceOffset", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
sub.AdvanceOffset()
}
})
b.Run("GetLag", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
sub.GetLag()
}
})
b.Run("SeekToOffset", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
offset := int64(i % 9000) // Stay within bounds
sub.SeekToOffset(offset)
}
})
}
// BenchmarkSMQOffsetIntegration benchmarks the full integration layer
func BenchmarkSMQOffsetIntegration(b *testing.B) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
b.Run("PublishRecord", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key-%d", i)
integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
}
})
b.Run("PublishRecordBatch", func(b *testing.B) {
batchSizes := []int{1, 10, 100}
for _, batchSize := range batchSizes {
b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
records := make([]PublishRecordRequest, batchSize)
for j := 0; j < batchSize; j++ {
records[j] = PublishRecordRequest{
Key: []byte(fmt.Sprintf("batch-%d-key-%d", i, j)),
Value: &schema_pb.RecordValue{},
}
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
}
})
}
})
// Pre-populate for subscription benchmarks
records := make([]PublishRecordRequest, 1000)
for i := 0; i < 1000; i++ {
records[i] = PublishRecordRequest{
Key: []byte(fmt.Sprintf("pre-key-%d", i)),
Value: &schema_pb.RecordValue{},
}
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
b.Run("CreateSubscription", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
subscriptionID := fmt.Sprintf("integration-sub-%d", i)
sub, err := integration.CreateSubscription(
subscriptionID,
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
b.Fatalf("Failed to create subscription: %v", err)
}
integration.CloseSubscription(subscriptionID)
_ = sub
}
})
b.Run("GetHighWaterMark", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
integration.GetHighWaterMark("test-namespace", "test-topic", partition)
}
})
b.Run("GetPartitionOffsetInfo", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
}
})
}
// BenchmarkConcurrentOperations benchmarks concurrent offset operations
func BenchmarkConcurrentOperations(b *testing.B) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
b.Run("ConcurrentPublish", func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
key := fmt.Sprintf("concurrent-key-%d", i)
integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
i++
}
})
})
// Pre-populate for concurrent reads
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("read-key-%d", i)
integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
}
b.Run("ConcurrentRead", func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
integration.GetHighWaterMark("test-namespace", "test-topic", partition)
}
})
})
b.Run("ConcurrentMixed", func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
if i%10 == 0 {
// 10% writes
key := fmt.Sprintf("mixed-key-%d", i)
integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
} else {
// 90% reads
integration.GetHighWaterMark("test-namespace", "test-topic", partition)
}
i++
}
})
})
}
// BenchmarkMemoryUsage benchmarks memory usage patterns
func BenchmarkMemoryUsage(b *testing.B) {
b.Run("InMemoryStorage", func(b *testing.B) {
storage := NewInMemoryOffsetStorage()
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
if err != nil {
b.Fatalf("Failed to create partition manager: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.AssignOffset()
if i%1000 == 0 {
// Periodic checkpoint to simulate real usage
manager.checkpoint(int64(i))
}
}
})
}