You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
6.5 KiB
6.5 KiB
SeaweedFS Message Queue Test Suite
This directory contains test programs for SeaweedFS Message Queue (MQ) functionality, including message producers and consumers.
Prerequisites
- SeaweedFS with MQ Broker and Agent: You need a running SeaweedFS instance with MQ broker and agent enabled
- Go: Go 1.19 or later required for building the test programs
Quick Start
1. Start SeaweedFS with MQ Broker and Agent
# Start SeaweedFS server with MQ broker and agent
weed server -mq.broker -mq.agent -filer -volume
# Or start components separately
weed master
weed volume -mserver=localhost:9333
weed filer -master=localhost:9333
weed mq.broker -filer=localhost:8888
weed mq.agent -brokers=localhost:17777
2. Build Test Programs
# Build both producer and consumer
make build
# Or build individually
make build-producer
make build-consumer
3. Run Basic Test
# Run a basic producer/consumer test
make test
# Or run producer and consumer manually
make consumer & # Start consumer in background
make producer # Start producer
Test Programs
Producer (producer/main.go
)
Generates structured messages and publishes them to a SeaweedMQ topic via the MQ agent.
Usage:
./bin/producer [options]
Options:
-agent
: MQ agent address (default: localhost:16777)-namespace
: Topic namespace (default: test)-topic
: Topic name (default: test-topic)-partitions
: Number of partitions (default: 4)-messages
: Number of messages to produce (default: 100)-publisher
: Publisher name (default: test-producer)-size
: Message size in bytes (default: 1024)-interval
: Interval between messages (default: 100ms)
Example:
./bin/producer -agent=localhost:16777 -namespace=test -topic=my-topic -messages=1000 -interval=50ms
Consumer (consumer/main.go
)
Consumes structured messages from a SeaweedMQ topic via the MQ agent.
Usage:
./bin/consumer [options]
Options:
-agent
: MQ agent address (default: localhost:16777)-namespace
: Topic namespace (default: test)-topic
: Topic name (default: test-topic)-group
: Consumer group name (default: test-consumer-group)-instance
: Consumer group instance ID (default: test-consumer-1)-max-partitions
: Maximum number of partitions to consume (default: 10)-window-size
: Sliding window size for concurrent processing (default: 100)-offset
: Offset type: earliest, latest, timestamp (default: latest)-offset-ts
: Offset timestamp in nanoseconds (for timestamp offset type)-filter
: Message filter (default: empty)-show-messages
: Show consumed messages (default: true)-log-progress
: Log progress every 10 messages (default: true)
Example:
./bin/consumer -agent=localhost:16777 -namespace=test -topic=my-topic -group=my-group -offset=earliest
Makefile Commands
Building
make build
: Build both producer and consumer binariesmake build-producer
: Build producer onlymake build-consumer
: Build consumer only
Running
make producer
: Build and run producermake consumer
: Build and run consumermake run-producer
: Run producer directly with go runmake run-consumer
: Run consumer directly with go run
Testing
make test
: Run basic producer/consumer testmake test-performance
: Run performance test (1000 messages, 8 partitions)make test-multiple-consumers
: Run test with multiple consumers
Cleanup
make clean
: Remove build artifacts
Help
make help
: Show detailed help
Configuration
Configure tests using environment variables:
export AGENT_ADDR=localhost:16777
export TOPIC_NAMESPACE=test
export TOPIC_NAME=test-topic
export PARTITION_COUNT=4
export MESSAGE_COUNT=100
export CONSUMER_GROUP=test-consumer-group
export CONSUMER_INSTANCE=test-consumer-1
Example Usage Scenarios
1. Basic Producer/Consumer Test
# Terminal 1: Start consumer
make consumer
# Terminal 2: Run producer
make producer MESSAGE_COUNT=50
2. Performance Testing
# Test with high throughput
make test-performance
3. Multiple Consumer Groups
# Terminal 1: Consumer group 1
make consumer CONSUMER_GROUP=group1
# Terminal 2: Consumer group 2
make consumer CONSUMER_GROUP=group2
# Terminal 3: Producer
make producer MESSAGE_COUNT=200
4. Different Offset Types
# Consume from earliest
make consumer OFFSET=earliest
# Consume from latest
make consumer OFFSET=latest
# Consume from timestamp
make consumer OFFSET=timestamp OFFSET_TS=1699000000000000000
Troubleshooting
Common Issues
- Connection Refused: Make sure SeaweedFS MQ agent is running on the specified address
- Agent Not Found: Ensure both MQ broker and agent are running (agent requires broker)
- Topic Not Found: The producer will create the topic automatically on first publish
- Consumer Not Receiving Messages: Check if consumer group offset is correct (try
earliest
) - Build Failures: Ensure you're running from the SeaweedFS root directory
Debug Mode
Enable verbose logging:
# Run with debug logging
GLOG_v=4 make producer
GLOG_v=4 make consumer
Check Broker and Agent Status
# Check if broker is running
curl http://localhost:9333/cluster/brokers
# Check if agent is running (if running as server)
curl http://localhost:9333/cluster/agents
# Or use weed shell
weed shell -master=localhost:9333
> mq.broker.list
Architecture
The test setup demonstrates:
- Agent-Based Architecture: Uses MQ agent as intermediary between clients and brokers
- Structured Messages: Messages use schema-based RecordValue format instead of raw bytes
- Topic Management: Creating and configuring topics with multiple partitions
- Message Production: Publishing structured messages with keys for partitioning
- Message Consumption: Consuming structured messages with consumer groups and offset management
- Load Balancing: Multiple consumers in same group share partition assignments
- Fault Tolerance: Graceful handling of agent and broker failures and reconnections
Files
producer/main.go
: Message producer implementationconsumer/main.go
: Message consumer implementationMakefile
: Build and test automationREADME.md
: This documentationbin/
: Built binaries (created during build)
Next Steps
- Modify the producer to send structured data using
RecordType
- Implement message filtering in the consumer
- Add metrics collection and monitoring
- Test with multiple broker instances
- Implement schema evolution testing