You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
5.8 KiB
192 lines
5.8 KiB
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
var (
|
|
agentAddr = flag.String("agent", "localhost:16777", "MQ agent address")
|
|
topicNamespace = flag.String("namespace", "test", "topic namespace")
|
|
topicName = flag.String("topic", "test-topic", "topic name")
|
|
consumerGroup = flag.String("group", "test-consumer-group", "consumer group name")
|
|
consumerGroupInstanceId = flag.String("instance", "test-consumer-1", "consumer group instance id")
|
|
maxPartitions = flag.Int("max-partitions", 10, "maximum number of partitions to consume")
|
|
slidingWindowSize = flag.Int("window-size", 100, "sliding window size for concurrent processing")
|
|
offsetType = flag.String("offset", "latest", "offset type: earliest, latest, timestamp")
|
|
offsetTsNs = flag.Int64("offset-ts", 0, "offset timestamp in nanoseconds (for timestamp offset type)")
|
|
showMessages = flag.Bool("show-messages", true, "show consumed messages")
|
|
logProgress = flag.Bool("log-progress", true, "log progress every 10 messages")
|
|
filter = flag.String("filter", "", "message filter")
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
fmt.Printf("Starting message consumer:\n")
|
|
fmt.Printf(" Agent: %s\n", *agentAddr)
|
|
fmt.Printf(" Topic: %s.%s\n", *topicNamespace, *topicName)
|
|
fmt.Printf(" Consumer Group: %s\n", *consumerGroup)
|
|
fmt.Printf(" Consumer Instance: %s\n", *consumerGroupInstanceId)
|
|
fmt.Printf(" Max Partitions: %d\n", *maxPartitions)
|
|
fmt.Printf(" Sliding Window Size: %d\n", *slidingWindowSize)
|
|
fmt.Printf(" Offset Type: %s\n", *offsetType)
|
|
fmt.Printf(" Filter: %s\n", *filter)
|
|
|
|
// Create topic
|
|
topicObj := topic.NewTopic(*topicNamespace, *topicName)
|
|
|
|
// Determine offset type
|
|
var pbOffsetType schema_pb.OffsetType
|
|
switch *offsetType {
|
|
case "earliest":
|
|
pbOffsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
|
|
case "latest":
|
|
pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
|
|
case "timestamp":
|
|
pbOffsetType = schema_pb.OffsetType_EXACT_TS_NS
|
|
default:
|
|
pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
|
|
}
|
|
|
|
// Create subscribe option
|
|
option := &agent_client.SubscribeOption{
|
|
ConsumerGroup: *consumerGroup,
|
|
ConsumerGroupInstanceId: *consumerGroupInstanceId,
|
|
Topic: topicObj,
|
|
OffsetType: pbOffsetType,
|
|
OffsetTsNs: *offsetTsNs,
|
|
Filter: *filter,
|
|
MaxSubscribedPartitions: int32(*maxPartitions),
|
|
SlidingWindowSize: int32(*slidingWindowSize),
|
|
}
|
|
|
|
// Create subscribe session
|
|
session, err := agent_client.NewSubscribeSession(*agentAddr, option)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create subscribe session: %v", err)
|
|
}
|
|
defer session.CloseSession()
|
|
|
|
// Statistics
|
|
var messageCount int64
|
|
var mu sync.Mutex
|
|
startTime := time.Now()
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Channel to signal completion
|
|
done := make(chan error, 1)
|
|
|
|
// Start consuming messages
|
|
fmt.Printf("\nStarting to consume messages...\n")
|
|
go func() {
|
|
err := session.SubscribeMessageRecord(
|
|
// onEachMessageFn
|
|
func(key []byte, record *schema_pb.RecordValue) {
|
|
mu.Lock()
|
|
messageCount++
|
|
currentCount := messageCount
|
|
mu.Unlock()
|
|
|
|
if *showMessages {
|
|
fmt.Printf("Received message: key=%s\n", string(key))
|
|
printRecordValue(record)
|
|
}
|
|
|
|
if *logProgress && currentCount%10 == 0 {
|
|
elapsed := time.Since(startTime)
|
|
rate := float64(currentCount) / elapsed.Seconds()
|
|
fmt.Printf("Consumed %d messages (%.2f msg/sec)\n", currentCount, rate)
|
|
}
|
|
},
|
|
// onCompletionFn
|
|
func() {
|
|
fmt.Printf("Subscription completed\n")
|
|
done <- nil
|
|
},
|
|
)
|
|
if err != nil {
|
|
done <- err
|
|
}
|
|
}()
|
|
|
|
// Wait for signal or completion
|
|
select {
|
|
case <-sigChan:
|
|
fmt.Printf("\nReceived shutdown signal, stopping consumer...\n")
|
|
case err := <-done:
|
|
if err != nil {
|
|
log.Printf("Subscription error: %v", err)
|
|
}
|
|
}
|
|
|
|
// Print final statistics
|
|
mu.Lock()
|
|
finalCount := messageCount
|
|
mu.Unlock()
|
|
|
|
duration := time.Since(startTime)
|
|
fmt.Printf("Consumed %d messages in %v\n", finalCount, duration)
|
|
if duration.Seconds() > 0 {
|
|
fmt.Printf("Average throughput: %.2f messages/sec\n", float64(finalCount)/duration.Seconds())
|
|
}
|
|
}
|
|
|
|
func printRecordValue(record *schema_pb.RecordValue) {
|
|
if record == nil || record.Fields == nil {
|
|
fmt.Printf(" (empty record)\n")
|
|
return
|
|
}
|
|
|
|
for fieldName, value := range record.Fields {
|
|
fmt.Printf(" %s: %s\n", fieldName, formatValue(value))
|
|
}
|
|
}
|
|
|
|
func formatValue(value *schema_pb.Value) string {
|
|
if value == nil {
|
|
return "(nil)"
|
|
}
|
|
|
|
switch kind := value.Kind.(type) {
|
|
case *schema_pb.Value_BoolValue:
|
|
return fmt.Sprintf("%t", kind.BoolValue)
|
|
case *schema_pb.Value_Int32Value:
|
|
return fmt.Sprintf("%d", kind.Int32Value)
|
|
case *schema_pb.Value_Int64Value:
|
|
return fmt.Sprintf("%d", kind.Int64Value)
|
|
case *schema_pb.Value_FloatValue:
|
|
return fmt.Sprintf("%f", kind.FloatValue)
|
|
case *schema_pb.Value_DoubleValue:
|
|
return fmt.Sprintf("%f", kind.DoubleValue)
|
|
case *schema_pb.Value_BytesValue:
|
|
if len(kind.BytesValue) > 50 {
|
|
return fmt.Sprintf("bytes[%d] %x...", len(kind.BytesValue), kind.BytesValue[:50])
|
|
}
|
|
return fmt.Sprintf("bytes[%d] %x", len(kind.BytesValue), kind.BytesValue)
|
|
case *schema_pb.Value_StringValue:
|
|
if len(kind.StringValue) > 100 {
|
|
return fmt.Sprintf("\"%s...\"", kind.StringValue[:100])
|
|
}
|
|
return fmt.Sprintf("\"%s\"", kind.StringValue)
|
|
case *schema_pb.Value_ListValue:
|
|
return fmt.Sprintf("list[%d items]", len(kind.ListValue.Values))
|
|
case *schema_pb.Value_RecordValue:
|
|
return fmt.Sprintf("record[%d fields]", len(kind.RecordValue.Fields))
|
|
default:
|
|
return "(unknown)"
|
|
}
|
|
}
|