You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
320 lines
8.8 KiB
320 lines
8.8 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// TestAgentClient_Creation tests agent client creation and health checks
|
|
func TestAgentClient_Creation(t *testing.T) {
|
|
// Skip if no real agent available (would need real SeaweedMQ setup)
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777") // default agent port
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Test health check
|
|
err = client.HealthCheck()
|
|
if err != nil {
|
|
t.Fatalf("Health check failed: %v", err)
|
|
}
|
|
|
|
t.Logf("Agent client created and health check passed")
|
|
}
|
|
|
|
// TestAgentClient_PublishRecord tests publishing records
|
|
func TestAgentClient_PublishRecord(t *testing.T) {
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Test publishing a record
|
|
key := []byte("test-key")
|
|
value := []byte("test-value")
|
|
timestamp := time.Now().UnixNano()
|
|
|
|
sequence, err := client.PublishRecord("test-topic", 0, key, value, timestamp)
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish record: %v", err)
|
|
}
|
|
|
|
if sequence < 0 {
|
|
t.Errorf("Invalid sequence: %d", sequence)
|
|
}
|
|
|
|
t.Logf("Published record with sequence: %d", sequence)
|
|
}
|
|
|
|
// TestAgentClient_SessionManagement tests publisher session lifecycle
|
|
func TestAgentClient_SessionManagement(t *testing.T) {
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Create publisher session
|
|
session, err := client.GetOrCreatePublisher("session-test-topic", 0)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create publisher: %v", err)
|
|
}
|
|
|
|
if session.SessionID == 0 {
|
|
t.Errorf("Invalid session ID: %d", session.SessionID)
|
|
}
|
|
|
|
if session.Topic != "session-test-topic" {
|
|
t.Errorf("Topic mismatch: got %s, want session-test-topic", session.Topic)
|
|
}
|
|
|
|
if session.Partition != 0 {
|
|
t.Errorf("Partition mismatch: got %d, want 0", session.Partition)
|
|
}
|
|
|
|
// Close the publisher
|
|
err = client.ClosePublisher("session-test-topic", 0)
|
|
if err != nil {
|
|
t.Errorf("Failed to close publisher: %v", err)
|
|
}
|
|
|
|
t.Logf("Publisher session managed successfully")
|
|
}
|
|
|
|
// TestAgentClient_ConcurrentPublish tests concurrent publishing
|
|
func TestAgentClient_ConcurrentPublish(t *testing.T) {
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Publish multiple records concurrently
|
|
numRecords := 10
|
|
errors := make(chan error, numRecords)
|
|
sequences := make(chan int64, numRecords)
|
|
|
|
for i := 0; i < numRecords; i++ {
|
|
go func(index int) {
|
|
key := []byte("concurrent-key")
|
|
value := []byte("concurrent-value-" + string(rune(index)))
|
|
timestamp := time.Now().UnixNano()
|
|
|
|
sequence, err := client.PublishRecord("concurrent-test-topic", 0, key, value, timestamp)
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
sequences <- sequence
|
|
errors <- nil
|
|
}(i)
|
|
}
|
|
|
|
// Collect results
|
|
successCount := 0
|
|
var lastSequence int64 = -1
|
|
|
|
for i := 0; i < numRecords; i++ {
|
|
err := <-errors
|
|
if err != nil {
|
|
t.Logf("Publish error: %v", err)
|
|
} else {
|
|
sequence := <-sequences
|
|
if sequence > lastSequence {
|
|
lastSequence = sequence
|
|
}
|
|
successCount++
|
|
}
|
|
}
|
|
|
|
if successCount < numRecords {
|
|
t.Errorf("Only %d/%d publishes succeeded", successCount, numRecords)
|
|
}
|
|
|
|
t.Logf("Concurrent publish test: %d/%d successful, last sequence: %d",
|
|
successCount, numRecords, lastSequence)
|
|
}
|
|
|
|
// TestAgentClient_SubscriberSession tests subscriber session creation and management
|
|
func TestAgentClient_SubscriberSession(t *testing.T) {
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
topic := "subscriber-test-topic"
|
|
partition := int32(0)
|
|
startOffset := int64(0)
|
|
|
|
// Create subscriber session
|
|
session, err := client.GetOrCreateSubscriber(topic, partition, startOffset)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscriber: %v", err)
|
|
}
|
|
|
|
if session.Topic != topic {
|
|
t.Errorf("Topic mismatch: got %s, want %s", session.Topic, topic)
|
|
}
|
|
|
|
if session.Partition != partition {
|
|
t.Errorf("Partition mismatch: got %d, want %d", session.Partition, partition)
|
|
}
|
|
|
|
if session.Stream == nil {
|
|
t.Errorf("Stream should not be nil")
|
|
}
|
|
|
|
if session.OffsetLedger == nil {
|
|
t.Errorf("OffsetLedger should not be nil")
|
|
}
|
|
|
|
// Test getting existing session
|
|
session2, err := client.GetOrCreateSubscriber(topic, partition, startOffset)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get existing subscriber: %v", err)
|
|
}
|
|
|
|
// Should return the same session
|
|
if session != session2 {
|
|
t.Errorf("Should return the same subscriber session")
|
|
}
|
|
|
|
t.Logf("Subscriber session test completed successfully")
|
|
}
|
|
|
|
// TestAgentClient_ReadRecords tests reading records from subscriber stream
|
|
func TestAgentClient_ReadRecords(t *testing.T) {
|
|
t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available")
|
|
|
|
client, err := NewAgentClient("localhost:17777")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create agent client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
topic := "read-records-test-topic"
|
|
partition := int32(0)
|
|
|
|
// First, publish some records to have data to read
|
|
testData := []struct {
|
|
key []byte
|
|
value []byte
|
|
}{
|
|
{[]byte("read-key-1"), []byte("read-value-1")},
|
|
{[]byte("read-key-2"), []byte("read-value-2")},
|
|
{[]byte("read-key-3"), []byte("read-value-3")},
|
|
}
|
|
|
|
// Publish records
|
|
for i, data := range testData {
|
|
timestamp := time.Now().UnixNano()
|
|
sequence, err := client.PublishRecord(topic, partition, data.key, data.value, timestamp)
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish record %d: %v", i, err)
|
|
}
|
|
t.Logf("Published record %d with sequence %d", i, sequence)
|
|
}
|
|
|
|
// Wait for records to be available
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
// Create subscriber session
|
|
subscriber, err := client.GetOrCreateSubscriber(topic, partition, 0)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscriber: %v", err)
|
|
}
|
|
|
|
// Try to read records
|
|
maxRecords := len(testData)
|
|
records, err := client.ReadRecords(subscriber, maxRecords)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read records: %v", err)
|
|
}
|
|
|
|
t.Logf("Read %d records from SeaweedMQ", len(records))
|
|
|
|
// Validate records
|
|
for i, record := range records {
|
|
if record == nil {
|
|
t.Errorf("Record %d should not be nil", i)
|
|
continue
|
|
}
|
|
|
|
if len(record.Value) == 0 {
|
|
t.Errorf("Record %d should have non-empty value", i)
|
|
}
|
|
|
|
if record.Timestamp == 0 {
|
|
t.Errorf("Record %d should have non-zero timestamp", i)
|
|
}
|
|
|
|
t.Logf("Record %d: key=%s, value=%s, timestamp=%d, sequence=%d",
|
|
i, string(record.Key), string(record.Value), record.Timestamp, record.Sequence)
|
|
}
|
|
|
|
// Test reading with smaller maxRecords
|
|
smallBatch, err := client.ReadRecords(subscriber, 1)
|
|
if err != nil {
|
|
t.Errorf("Failed to read small batch: %v", err)
|
|
}
|
|
t.Logf("Small batch read returned %d records", len(smallBatch))
|
|
|
|
// Test reading when no records available (should not block indefinitely)
|
|
emptyBatch, err := client.ReadRecords(subscriber, 10)
|
|
if err != nil {
|
|
t.Logf("Expected: reading when no records available returned error: %v", err)
|
|
} else {
|
|
t.Logf("Reading when no records available returned %d records", len(emptyBatch))
|
|
}
|
|
|
|
t.Logf("ReadRecords test completed successfully")
|
|
}
|
|
|
|
// TestAgentClient_ReadRecords_ErrorHandling tests error cases for reading records
|
|
func TestAgentClient_ReadRecords_ErrorHandling(t *testing.T) {
|
|
// This is a unit test that can run without SeaweedMQ agent
|
|
ctx := context.TODO()
|
|
client := &AgentClient{
|
|
subscribers: make(map[string]*SubscriberSession),
|
|
ctx: ctx,
|
|
}
|
|
|
|
// Test reading from nil session - this will fail safely
|
|
records, err := client.ReadRecords(nil, 10)
|
|
if err == nil {
|
|
t.Errorf("Reading from nil session should fail")
|
|
}
|
|
if records != nil {
|
|
t.Errorf("Records should be nil when session is nil")
|
|
}
|
|
|
|
// Test reading with maxRecords=0 - should return empty records quickly
|
|
session := &SubscriberSession{
|
|
Topic: "test-topic",
|
|
Partition: 0,
|
|
Stream: nil, // This will cause an error when trying to read
|
|
}
|
|
|
|
records, err = client.ReadRecords(session, 0)
|
|
if len(records) != 0 {
|
|
t.Errorf("Should return empty records for maxRecords=0, got %d", len(records))
|
|
}
|
|
// Error is expected due to nil stream, but it should return empty records before attempting to read
|
|
|
|
t.Logf("ReadRecords error handling test completed")
|
|
}
|