You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

320 lines
8.8 KiB

package integration
import (
"context"
"testing"
"time"
)
// TestAgentClient_Creation tests agent client creation and health checks
func TestAgentClient_Creation(t *testing.T) {
// Skip if no real agent available (would need real SeaweedMQ setup)
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777") // default agent port
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
// Test health check
err = client.HealthCheck()
if err != nil {
t.Fatalf("Health check failed: %v", err)
}
t.Logf("Agent client created and health check passed")
}
// TestAgentClient_PublishRecord tests publishing records
func TestAgentClient_PublishRecord(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777")
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
// Test publishing a record
key := []byte("test-key")
value := []byte("test-value")
timestamp := time.Now().UnixNano()
sequence, err := client.PublishRecord("test-topic", 0, key, value, timestamp)
if err != nil {
t.Fatalf("Failed to publish record: %v", err)
}
if sequence < 0 {
t.Errorf("Invalid sequence: %d", sequence)
}
t.Logf("Published record with sequence: %d", sequence)
}
// TestAgentClient_SessionManagement tests publisher session lifecycle
func TestAgentClient_SessionManagement(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777")
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
// Create publisher session
session, err := client.GetOrCreatePublisher("session-test-topic", 0)
if err != nil {
t.Fatalf("Failed to create publisher: %v", err)
}
if session.SessionID == 0 {
t.Errorf("Invalid session ID: %d", session.SessionID)
}
if session.Topic != "session-test-topic" {
t.Errorf("Topic mismatch: got %s, want session-test-topic", session.Topic)
}
if session.Partition != 0 {
t.Errorf("Partition mismatch: got %d, want 0", session.Partition)
}
// Close the publisher
err = client.ClosePublisher("session-test-topic", 0)
if err != nil {
t.Errorf("Failed to close publisher: %v", err)
}
t.Logf("Publisher session managed successfully")
}
// TestAgentClient_ConcurrentPublish tests concurrent publishing
func TestAgentClient_ConcurrentPublish(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777")
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
// Publish multiple records concurrently
numRecords := 10
errors := make(chan error, numRecords)
sequences := make(chan int64, numRecords)
for i := 0; i < numRecords; i++ {
go func(index int) {
key := []byte("concurrent-key")
value := []byte("concurrent-value-" + string(rune(index)))
timestamp := time.Now().UnixNano()
sequence, err := client.PublishRecord("concurrent-test-topic", 0, key, value, timestamp)
if err != nil {
errors <- err
return
}
sequences <- sequence
errors <- nil
}(i)
}
// Collect results
successCount := 0
var lastSequence int64 = -1
for i := 0; i < numRecords; i++ {
err := <-errors
if err != nil {
t.Logf("Publish error: %v", err)
} else {
sequence := <-sequences
if sequence > lastSequence {
lastSequence = sequence
}
successCount++
}
}
if successCount < numRecords {
t.Errorf("Only %d/%d publishes succeeded", successCount, numRecords)
}
t.Logf("Concurrent publish test: %d/%d successful, last sequence: %d",
successCount, numRecords, lastSequence)
}
// TestAgentClient_SubscriberSession tests subscriber session creation and management
func TestAgentClient_SubscriberSession(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777")
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
topic := "subscriber-test-topic"
partition := int32(0)
startOffset := int64(0)
// Create subscriber session
session, err := client.GetOrCreateSubscriber(topic, partition, startOffset)
if err != nil {
t.Fatalf("Failed to create subscriber: %v", err)
}
if session.Topic != topic {
t.Errorf("Topic mismatch: got %s, want %s", session.Topic, topic)
}
if session.Partition != partition {
t.Errorf("Partition mismatch: got %d, want %d", session.Partition, partition)
}
if session.Stream == nil {
t.Errorf("Stream should not be nil")
}
if session.OffsetLedger == nil {
t.Errorf("OffsetLedger should not be nil")
}
// Test getting existing session
session2, err := client.GetOrCreateSubscriber(topic, partition, startOffset)
if err != nil {
t.Fatalf("Failed to get existing subscriber: %v", err)
}
// Should return the same session
if session != session2 {
t.Errorf("Should return the same subscriber session")
}
t.Logf("Subscriber session test completed successfully")
}
// TestAgentClient_ReadRecords tests reading records from subscriber stream
func TestAgentClient_ReadRecords(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
client, err := NewAgentClient("localhost:17777")
if err != nil {
t.Fatalf("Failed to create agent client: %v", err)
}
defer client.Close()
topic := "read-records-test-topic"
partition := int32(0)
// First, publish some records to have data to read
testData := []struct {
key []byte
value []byte
}{
{[]byte("read-key-1"), []byte("read-value-1")},
{[]byte("read-key-2"), []byte("read-value-2")},
{[]byte("read-key-3"), []byte("read-value-3")},
}
// Publish records
for i, data := range testData {
timestamp := time.Now().UnixNano()
sequence, err := client.PublishRecord(topic, partition, data.key, data.value, timestamp)
if err != nil {
t.Fatalf("Failed to publish record %d: %v", i, err)
}
t.Logf("Published record %d with sequence %d", i, sequence)
}
// Wait for records to be available
time.Sleep(200 * time.Millisecond)
// Create subscriber session
subscriber, err := client.GetOrCreateSubscriber(topic, partition, 0)
if err != nil {
t.Fatalf("Failed to create subscriber: %v", err)
}
// Try to read records
maxRecords := len(testData)
records, err := client.ReadRecords(subscriber, maxRecords)
if err != nil {
t.Fatalf("Failed to read records: %v", err)
}
t.Logf("Read %d records from SeaweedMQ", len(records))
// Validate records
for i, record := range records {
if record == nil {
t.Errorf("Record %d should not be nil", i)
continue
}
if len(record.Value) == 0 {
t.Errorf("Record %d should have non-empty value", i)
}
if record.Timestamp == 0 {
t.Errorf("Record %d should have non-zero timestamp", i)
}
t.Logf("Record %d: key=%s, value=%s, timestamp=%d, sequence=%d",
i, string(record.Key), string(record.Value), record.Timestamp, record.Sequence)
}
// Test reading with smaller maxRecords
smallBatch, err := client.ReadRecords(subscriber, 1)
if err != nil {
t.Errorf("Failed to read small batch: %v", err)
}
t.Logf("Small batch read returned %d records", len(smallBatch))
// Test reading when no records available (should not block indefinitely)
emptyBatch, err := client.ReadRecords(subscriber, 10)
if err != nil {
t.Logf("Expected: reading when no records available returned error: %v", err)
} else {
t.Logf("Reading when no records available returned %d records", len(emptyBatch))
}
t.Logf("ReadRecords test completed successfully")
}
// TestAgentClient_ReadRecords_ErrorHandling tests error cases for reading records
func TestAgentClient_ReadRecords_ErrorHandling(t *testing.T) {
// This is a unit test that can run without SeaweedMQ agent
ctx := context.TODO()
client := &AgentClient{
subscribers: make(map[string]*SubscriberSession),
ctx: ctx,
}
// Test reading from nil session - this will fail safely
records, err := client.ReadRecords(nil, 10)
if err == nil {
t.Errorf("Reading from nil session should fail")
}
if records != nil {
t.Errorf("Records should be nil when session is nil")
}
// Test reading with maxRecords=0 - should return empty records quickly
session := &SubscriberSession{
Topic: "test-topic",
Partition: 0,
Stream: nil, // This will cause an error when trying to read
}
records, err = client.ReadRecords(session, 0)
if len(records) != 0 {
t.Errorf("Should return empty records for maxRecords=0, got %d", len(records))
}
// Error is expected due to nil stream, but it should return empty records before attempting to read
t.Logf("ReadRecords error handling test completed")
}