You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

280 lines
7.6 KiB

package offset
import (
"testing"
"time"
)
func TestLedger_BasicOperations(t *testing.T) {
ledger := NewLedger()
// Initially empty
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
t.Errorf("earliest offset: got %d, want 0", earliest)
}
if latest := ledger.GetLatestOffset(); latest != 0 {
t.Errorf("latest offset: got %d, want 0", latest)
}
if hwm := ledger.GetHighWaterMark(); hwm != 0 {
t.Errorf("high water mark: got %d, want 0", hwm)
}
// Assign some offsets
baseOffset1 := ledger.AssignOffsets(3)
if baseOffset1 != 0 {
t.Errorf("first base offset: got %d, want 0", baseOffset1)
}
baseOffset2 := ledger.AssignOffsets(2)
if baseOffset2 != 3 {
t.Errorf("second base offset: got %d, want 3", baseOffset2)
}
// High water mark should be updated
if hwm := ledger.GetHighWaterMark(); hwm != 5 {
t.Errorf("high water mark after assignment: got %d, want 5", hwm)
}
// But no records yet, so earliest/latest still 0
if latest := ledger.GetLatestOffset(); latest != 0 {
t.Errorf("latest offset with no records: got %d, want 0", latest)
}
}
func TestLedger_AppendAndRetrieve(t *testing.T) {
ledger := NewLedger()
// Assign and append some records
baseOffset := ledger.AssignOffsets(3)
if baseOffset != 0 {
t.Fatalf("unexpected base offset: %d", baseOffset)
}
// Append records with different timestamps
timestamp1 := time.Now().UnixNano()
timestamp2 := timestamp1 + 1000000 // +1ms
timestamp3 := timestamp2 + 2000000 // +2ms
if err := ledger.AppendRecord(0, timestamp1, 100); err != nil {
t.Fatalf("append record 0: %v", err)
}
if err := ledger.AppendRecord(1, timestamp2, 200); err != nil {
t.Fatalf("append record 1: %v", err)
}
if err := ledger.AppendRecord(2, timestamp3, 150); err != nil {
t.Fatalf("append record 2: %v", err)
}
// Check earliest/latest
if earliest := ledger.GetEarliestOffset(); earliest != 0 {
t.Errorf("earliest offset: got %d, want 0", earliest)
}
if latest := ledger.GetLatestOffset(); latest != 2 {
t.Errorf("latest offset: got %d, want 2", latest)
}
// Retrieve records
ts, size, err := ledger.GetRecord(0)
if err != nil {
t.Fatalf("get record 0: %v", err)
}
if ts != timestamp1 {
t.Errorf("record 0 timestamp: got %d, want %d", ts, timestamp1)
}
if size != 100 {
t.Errorf("record 0 size: got %d, want 100", size)
}
ts, size, err = ledger.GetRecord(2)
if err != nil {
t.Fatalf("get record 2: %v", err)
}
if ts != timestamp3 {
t.Errorf("record 2 timestamp: got %d, want %d", ts, timestamp3)
}
if size != 150 {
t.Errorf("record 2 size: got %d, want 150", size)
}
// Try to get non-existent record
_, _, err = ledger.GetRecord(5)
if err == nil {
t.Errorf("expected error for non-existent offset 5")
}
}
func TestLedger_FindOffsetByTimestamp(t *testing.T) {
ledger := NewLedger()
// Add some records with known timestamps
baseTime := time.Now().UnixNano()
ledger.AssignOffsets(5)
ledger.AppendRecord(0, baseTime, 100)
ledger.AppendRecord(1, baseTime+1000000, 200) // +1ms
ledger.AppendRecord(2, baseTime+3000000, 150) // +3ms
ledger.AppendRecord(3, baseTime+10000000, 300) // +10ms
ledger.AppendRecord(4, baseTime+20000000, 250) // +20ms
// Find offset by timestamp
testCases := []struct {
timestamp int64
expectedOffset int64
description string
}{
{baseTime - 1000000, 0, "before first record"}, // Before any record
{baseTime, 0, "exact first timestamp"}, // Exact match first
{baseTime + 500000, 1, "between first and second"}, // Between records
{baseTime + 1000000, 1, "exact second timestamp"}, // Exact match middle
{baseTime + 5000000, 3, "between records"}, // Between records
{baseTime + 20000000, 4, "exact last timestamp"}, // Exact match last
{baseTime + 30000000, 5, "after last record"}, // After all records (should return HWM)
}
for _, tc := range testCases {
offset := ledger.FindOffsetByTimestamp(tc.timestamp)
if offset != tc.expectedOffset {
t.Errorf("%s: got offset %d, want %d", tc.description, offset, tc.expectedOffset)
}
}
}
func TestLedger_ErrorConditions(t *testing.T) {
ledger := NewLedger()
// Try to append without assigning offsets first
err := ledger.AppendRecord(0, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error when appending without assignment")
}
// Assign some offsets
ledger.AssignOffsets(3)
// Try to append out-of-range offset
err = ledger.AppendRecord(5, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error for out-of-range offset")
}
// Try negative offset
err = ledger.AppendRecord(-1, time.Now().UnixNano(), 100)
if err == nil {
t.Errorf("expected error for negative offset")
}
// Append a record successfully
timestamp := time.Now().UnixNano()
if err := ledger.AppendRecord(0, timestamp, 100); err != nil {
t.Fatalf("failed to append valid record: %v", err)
}
// Try to append the same offset again
err = ledger.AppendRecord(0, timestamp+1000, 200)
if err == nil {
t.Errorf("expected error for duplicate offset")
}
}
func TestLedger_ConcurrentAccess(t *testing.T) {
ledger := NewLedger()
// Test concurrent offset assignment
done := make(chan bool, 2)
go func() {
for i := 0; i < 100; i++ {
ledger.AssignOffsets(1)
}
done <- true
}()
go func() {
for i := 0; i < 100; i++ {
ledger.AssignOffsets(1)
}
done <- true
}()
// Wait for both goroutines
<-done
<-done
// Should have assigned 200 total offsets
hwm := ledger.GetHighWaterMark()
if hwm != 200 {
t.Errorf("concurrent assignment: got HWM %d, want 200", hwm)
}
// Test concurrent reads
ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
ledger.AppendRecord(200, timestamp, 100)
// Multiple concurrent reads should work fine
readDone := make(chan bool, 5)
for i := 0; i < 5; i++ {
go func() {
ts, size, err := ledger.GetRecord(200)
if err != nil || ts != timestamp || size != 100 {
t.Errorf("concurrent read failed: ts=%d size=%d err=%v", ts, size, err)
}
readDone <- true
}()
}
// Wait for all reads
for i := 0; i < 5; i++ {
<-readDone
}
}
func TestLedger_GetStats(t *testing.T) {
ledger := NewLedger()
// Initially empty
count, earliest, latest, next := ledger.GetStats()
if count != 0 || earliest != 0 || latest != 0 || next != 0 {
t.Errorf("initial stats: got count=%d earliest=%d latest=%d next=%d", count, earliest, latest, next)
}
// Add some data
baseTime := time.Now().UnixNano()
ledger.AssignOffsets(3)
ledger.AppendRecord(0, baseTime, 100)
ledger.AppendRecord(1, baseTime+1000000, 200)
ledger.AppendRecord(2, baseTime+2000000, 150)
count, earliest, latest, next = ledger.GetStats()
if count != 3 {
t.Errorf("entry count: got %d, want 3", count)
}
if earliest != baseTime {
t.Errorf("earliest time: got %d, want %d", earliest, baseTime)
}
if latest != baseTime+2000000 {
t.Errorf("latest time: got %d, want %d", latest, baseTime+2000000)
}
if next != 3 {
t.Errorf("next offset: got %d, want 3", next)
}
}
func TestLedger_EmptyLedgerTimestampLookup(t *testing.T) {
ledger := NewLedger()
// Empty ledger should handle timestamp queries gracefully
offset := ledger.FindOffsetByTimestamp(time.Now().UnixNano())
if offset != 0 {
t.Errorf("empty ledger timestamp lookup: got %d, want 0", offset)
}
earliest, latest := ledger.GetTimestampRange()
if earliest <= 0 || latest <= 0 {
t.Errorf("empty ledger timestamp range: earliest=%d latest=%d", earliest, latest)
}
// For empty ledger, should return current time as both earliest and latest
if earliest != latest {
t.Errorf("empty ledger should have same earliest and latest time")
}
}