You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
280 lines
7.6 KiB
280 lines
7.6 KiB
package offset
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestLedger_BasicOperations(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Initially empty
|
|
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
|
|
t.Errorf("earliest offset: got %d, want 0", earliest)
|
|
}
|
|
if latest := ledger.GetLatestOffset(); latest != 0 {
|
|
t.Errorf("latest offset: got %d, want 0", latest)
|
|
}
|
|
if hwm := ledger.GetHighWaterMark(); hwm != 0 {
|
|
t.Errorf("high water mark: got %d, want 0", hwm)
|
|
}
|
|
|
|
// Assign some offsets
|
|
baseOffset1 := ledger.AssignOffsets(3)
|
|
if baseOffset1 != 0 {
|
|
t.Errorf("first base offset: got %d, want 0", baseOffset1)
|
|
}
|
|
|
|
baseOffset2 := ledger.AssignOffsets(2)
|
|
if baseOffset2 != 3 {
|
|
t.Errorf("second base offset: got %d, want 3", baseOffset2)
|
|
}
|
|
|
|
// High water mark should be updated
|
|
if hwm := ledger.GetHighWaterMark(); hwm != 5 {
|
|
t.Errorf("high water mark after assignment: got %d, want 5", hwm)
|
|
}
|
|
|
|
// But no records yet, so earliest/latest still 0
|
|
if latest := ledger.GetLatestOffset(); latest != 0 {
|
|
t.Errorf("latest offset with no records: got %d, want 0", latest)
|
|
}
|
|
}
|
|
|
|
func TestLedger_AppendAndRetrieve(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Assign and append some records
|
|
baseOffset := ledger.AssignOffsets(3)
|
|
if baseOffset != 0 {
|
|
t.Fatalf("unexpected base offset: %d", baseOffset)
|
|
}
|
|
|
|
// Append records with different timestamps
|
|
timestamp1 := time.Now().UnixNano()
|
|
timestamp2 := timestamp1 + 1000000 // +1ms
|
|
timestamp3 := timestamp2 + 2000000 // +2ms
|
|
|
|
if err := ledger.AppendRecord(0, timestamp1, 100); err != nil {
|
|
t.Fatalf("append record 0: %v", err)
|
|
}
|
|
if err := ledger.AppendRecord(1, timestamp2, 200); err != nil {
|
|
t.Fatalf("append record 1: %v", err)
|
|
}
|
|
if err := ledger.AppendRecord(2, timestamp3, 150); err != nil {
|
|
t.Fatalf("append record 2: %v", err)
|
|
}
|
|
|
|
// Check earliest/latest
|
|
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
|
|
t.Errorf("earliest offset: got %d, want 0", earliest)
|
|
}
|
|
if latest := ledger.GetLatestOffset(); latest != 2 {
|
|
t.Errorf("latest offset: got %d, want 2", latest)
|
|
}
|
|
|
|
// Retrieve records
|
|
ts, size, err := ledger.GetRecord(0)
|
|
if err != nil {
|
|
t.Fatalf("get record 0: %v", err)
|
|
}
|
|
if ts != timestamp1 {
|
|
t.Errorf("record 0 timestamp: got %d, want %d", ts, timestamp1)
|
|
}
|
|
if size != 100 {
|
|
t.Errorf("record 0 size: got %d, want 100", size)
|
|
}
|
|
|
|
ts, size, err = ledger.GetRecord(2)
|
|
if err != nil {
|
|
t.Fatalf("get record 2: %v", err)
|
|
}
|
|
if ts != timestamp3 {
|
|
t.Errorf("record 2 timestamp: got %d, want %d", ts, timestamp3)
|
|
}
|
|
if size != 150 {
|
|
t.Errorf("record 2 size: got %d, want 150", size)
|
|
}
|
|
|
|
// Try to get non-existent record
|
|
_, _, err = ledger.GetRecord(5)
|
|
if err == nil {
|
|
t.Errorf("expected error for non-existent offset 5")
|
|
}
|
|
}
|
|
|
|
func TestLedger_FindOffsetByTimestamp(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Add some records with known timestamps
|
|
baseTime := time.Now().UnixNano()
|
|
ledger.AssignOffsets(5)
|
|
|
|
ledger.AppendRecord(0, baseTime, 100)
|
|
ledger.AppendRecord(1, baseTime+1000000, 200) // +1ms
|
|
ledger.AppendRecord(2, baseTime+3000000, 150) // +3ms
|
|
ledger.AppendRecord(3, baseTime+10000000, 300) // +10ms
|
|
ledger.AppendRecord(4, baseTime+20000000, 250) // +20ms
|
|
|
|
// Find offset by timestamp
|
|
testCases := []struct {
|
|
timestamp int64
|
|
expectedOffset int64
|
|
description string
|
|
}{
|
|
{baseTime - 1000000, 0, "before first record"}, // Before any record
|
|
{baseTime, 0, "exact first timestamp"}, // Exact match first
|
|
{baseTime + 500000, 1, "between first and second"}, // Between records
|
|
{baseTime + 1000000, 1, "exact second timestamp"}, // Exact match middle
|
|
{baseTime + 5000000, 3, "between records"}, // Between records
|
|
{baseTime + 20000000, 4, "exact last timestamp"}, // Exact match last
|
|
{baseTime + 30000000, 5, "after last record"}, // After all records (should return HWM)
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
offset := ledger.FindOffsetByTimestamp(tc.timestamp)
|
|
if offset != tc.expectedOffset {
|
|
t.Errorf("%s: got offset %d, want %d", tc.description, offset, tc.expectedOffset)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLedger_ErrorConditions(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Try to append without assigning offsets first
|
|
err := ledger.AppendRecord(0, time.Now().UnixNano(), 100)
|
|
if err == nil {
|
|
t.Errorf("expected error when appending without assignment")
|
|
}
|
|
|
|
// Assign some offsets
|
|
ledger.AssignOffsets(3)
|
|
|
|
// Try to append out-of-range offset
|
|
err = ledger.AppendRecord(5, time.Now().UnixNano(), 100)
|
|
if err == nil {
|
|
t.Errorf("expected error for out-of-range offset")
|
|
}
|
|
|
|
// Try negative offset
|
|
err = ledger.AppendRecord(-1, time.Now().UnixNano(), 100)
|
|
if err == nil {
|
|
t.Errorf("expected error for negative offset")
|
|
}
|
|
|
|
// Append a record successfully
|
|
timestamp := time.Now().UnixNano()
|
|
if err := ledger.AppendRecord(0, timestamp, 100); err != nil {
|
|
t.Fatalf("failed to append valid record: %v", err)
|
|
}
|
|
|
|
// Try to append the same offset again
|
|
err = ledger.AppendRecord(0, timestamp+1000, 200)
|
|
if err == nil {
|
|
t.Errorf("expected error for duplicate offset")
|
|
}
|
|
}
|
|
|
|
func TestLedger_ConcurrentAccess(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Test concurrent offset assignment
|
|
done := make(chan bool, 2)
|
|
|
|
go func() {
|
|
for i := 0; i < 100; i++ {
|
|
ledger.AssignOffsets(1)
|
|
}
|
|
done <- true
|
|
}()
|
|
|
|
go func() {
|
|
for i := 0; i < 100; i++ {
|
|
ledger.AssignOffsets(1)
|
|
}
|
|
done <- true
|
|
}()
|
|
|
|
// Wait for both goroutines
|
|
<-done
|
|
<-done
|
|
|
|
// Should have assigned 200 total offsets
|
|
hwm := ledger.GetHighWaterMark()
|
|
if hwm != 200 {
|
|
t.Errorf("concurrent assignment: got HWM %d, want 200", hwm)
|
|
}
|
|
|
|
// Test concurrent reads
|
|
ledger.AssignOffsets(1)
|
|
timestamp := time.Now().UnixNano()
|
|
ledger.AppendRecord(200, timestamp, 100)
|
|
|
|
// Multiple concurrent reads should work fine
|
|
readDone := make(chan bool, 5)
|
|
for i := 0; i < 5; i++ {
|
|
go func() {
|
|
ts, size, err := ledger.GetRecord(200)
|
|
if err != nil || ts != timestamp || size != 100 {
|
|
t.Errorf("concurrent read failed: ts=%d size=%d err=%v", ts, size, err)
|
|
}
|
|
readDone <- true
|
|
}()
|
|
}
|
|
|
|
// Wait for all reads
|
|
for i := 0; i < 5; i++ {
|
|
<-readDone
|
|
}
|
|
}
|
|
|
|
func TestLedger_GetStats(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Initially empty
|
|
count, earliest, latest, next := ledger.GetStats()
|
|
if count != 0 || earliest != 0 || latest != 0 || next != 0 {
|
|
t.Errorf("initial stats: got count=%d earliest=%d latest=%d next=%d", count, earliest, latest, next)
|
|
}
|
|
|
|
// Add some data
|
|
baseTime := time.Now().UnixNano()
|
|
ledger.AssignOffsets(3)
|
|
ledger.AppendRecord(0, baseTime, 100)
|
|
ledger.AppendRecord(1, baseTime+1000000, 200)
|
|
ledger.AppendRecord(2, baseTime+2000000, 150)
|
|
|
|
count, earliest, latest, next = ledger.GetStats()
|
|
if count != 3 {
|
|
t.Errorf("entry count: got %d, want 3", count)
|
|
}
|
|
if earliest != baseTime {
|
|
t.Errorf("earliest time: got %d, want %d", earliest, baseTime)
|
|
}
|
|
if latest != baseTime+2000000 {
|
|
t.Errorf("latest time: got %d, want %d", latest, baseTime+2000000)
|
|
}
|
|
if next != 3 {
|
|
t.Errorf("next offset: got %d, want 3", next)
|
|
}
|
|
}
|
|
|
|
func TestLedger_EmptyLedgerTimestampLookup(t *testing.T) {
|
|
ledger := NewLedger()
|
|
|
|
// Empty ledger should handle timestamp queries gracefully
|
|
offset := ledger.FindOffsetByTimestamp(time.Now().UnixNano())
|
|
if offset != 0 {
|
|
t.Errorf("empty ledger timestamp lookup: got %d, want 0", offset)
|
|
}
|
|
|
|
earliest, latest := ledger.GetTimestampRange()
|
|
if earliest <= 0 || latest <= 0 {
|
|
t.Errorf("empty ledger timestamp range: earliest=%d latest=%d", earliest, latest)
|
|
}
|
|
// For empty ledger, should return current time as both earliest and latest
|
|
if earliest != latest {
|
|
t.Errorf("empty ledger should have same earliest and latest time")
|
|
}
|
|
}
|