You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

192 lines
5.8 KiB

package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
var (
agentAddr = flag.String("agent", "localhost:16777", "MQ agent address")
topicNamespace = flag.String("namespace", "test", "topic namespace")
topicName = flag.String("topic", "test-topic", "topic name")
consumerGroup = flag.String("group", "test-consumer-group", "consumer group name")
consumerGroupInstanceId = flag.String("instance", "test-consumer-1", "consumer group instance id")
maxPartitions = flag.Int("max-partitions", 10, "maximum number of partitions to consume")
slidingWindowSize = flag.Int("window-size", 100, "sliding window size for concurrent processing")
offsetType = flag.String("offset", "latest", "offset type: earliest, latest, timestamp")
offsetTsNs = flag.Int64("offset-ts", 0, "offset timestamp in nanoseconds (for timestamp offset type)")
showMessages = flag.Bool("show-messages", true, "show consumed messages")
logProgress = flag.Bool("log-progress", true, "log progress every 10 messages")
filter = flag.String("filter", "", "message filter")
)
func main() {
flag.Parse()
fmt.Printf("Starting message consumer:\n")
fmt.Printf(" Agent: %s\n", *agentAddr)
fmt.Printf(" Topic: %s.%s\n", *topicNamespace, *topicName)
fmt.Printf(" Consumer Group: %s\n", *consumerGroup)
fmt.Printf(" Consumer Instance: %s\n", *consumerGroupInstanceId)
fmt.Printf(" Max Partitions: %d\n", *maxPartitions)
fmt.Printf(" Sliding Window Size: %d\n", *slidingWindowSize)
fmt.Printf(" Offset Type: %s\n", *offsetType)
fmt.Printf(" Filter: %s\n", *filter)
// Create topic
topicObj := topic.NewTopic(*topicNamespace, *topicName)
// Determine offset type
var pbOffsetType schema_pb.OffsetType
switch *offsetType {
case "earliest":
pbOffsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
case "latest":
pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
case "timestamp":
pbOffsetType = schema_pb.OffsetType_EXACT_TS_NS
default:
pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
}
// Create subscribe option
option := &agent_client.SubscribeOption{
ConsumerGroup: *consumerGroup,
ConsumerGroupInstanceId: *consumerGroupInstanceId,
Topic: topicObj,
OffsetType: pbOffsetType,
OffsetTsNs: *offsetTsNs,
Filter: *filter,
MaxSubscribedPartitions: int32(*maxPartitions),
SlidingWindowSize: int32(*slidingWindowSize),
}
// Create subscribe session
session, err := agent_client.NewSubscribeSession(*agentAddr, option)
if err != nil {
log.Fatalf("Failed to create subscribe session: %v", err)
}
defer session.CloseSession()
// Statistics
var messageCount int64
var mu sync.Mutex
startTime := time.Now()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Channel to signal completion
done := make(chan error, 1)
// Start consuming messages
fmt.Printf("\nStarting to consume messages...\n")
go func() {
err := session.SubscribeMessageRecord(
// onEachMessageFn
func(key []byte, record *schema_pb.RecordValue) {
mu.Lock()
messageCount++
currentCount := messageCount
mu.Unlock()
if *showMessages {
fmt.Printf("Received message: key=%s\n", string(key))
printRecordValue(record)
}
if *logProgress && currentCount%10 == 0 {
elapsed := time.Since(startTime)
rate := float64(currentCount) / elapsed.Seconds()
fmt.Printf("Consumed %d messages (%.2f msg/sec)\n", currentCount, rate)
}
},
// onCompletionFn
func() {
fmt.Printf("Subscription completed\n")
done <- nil
},
)
if err != nil {
done <- err
}
}()
// Wait for signal or completion
select {
case <-sigChan:
fmt.Printf("\nReceived shutdown signal, stopping consumer...\n")
case err := <-done:
if err != nil {
log.Printf("Subscription error: %v", err)
}
}
// Print final statistics
mu.Lock()
finalCount := messageCount
mu.Unlock()
duration := time.Since(startTime)
fmt.Printf("Consumed %d messages in %v\n", finalCount, duration)
if duration.Seconds() > 0 {
fmt.Printf("Average throughput: %.2f messages/sec\n", float64(finalCount)/duration.Seconds())
}
}
func printRecordValue(record *schema_pb.RecordValue) {
if record == nil || record.Fields == nil {
fmt.Printf(" (empty record)\n")
return
}
for fieldName, value := range record.Fields {
fmt.Printf(" %s: %s\n", fieldName, formatValue(value))
}
}
func formatValue(value *schema_pb.Value) string {
if value == nil {
return "(nil)"
}
switch kind := value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return fmt.Sprintf("%t", kind.BoolValue)
case *schema_pb.Value_Int32Value:
return fmt.Sprintf("%d", kind.Int32Value)
case *schema_pb.Value_Int64Value:
return fmt.Sprintf("%d", kind.Int64Value)
case *schema_pb.Value_FloatValue:
return fmt.Sprintf("%f", kind.FloatValue)
case *schema_pb.Value_DoubleValue:
return fmt.Sprintf("%f", kind.DoubleValue)
case *schema_pb.Value_BytesValue:
if len(kind.BytesValue) > 50 {
return fmt.Sprintf("bytes[%d] %x...", len(kind.BytesValue), kind.BytesValue[:50])
}
return fmt.Sprintf("bytes[%d] %x", len(kind.BytesValue), kind.BytesValue)
case *schema_pb.Value_StringValue:
if len(kind.StringValue) > 100 {
return fmt.Sprintf("\"%s...\"", kind.StringValue[:100])
}
return fmt.Sprintf("\"%s\"", kind.StringValue)
case *schema_pb.Value_ListValue:
return fmt.Sprintf("list[%d items]", len(kind.ListValue.Values))
case *schema_pb.Value_RecordValue:
return fmt.Sprintf("record[%d fields]", len(kind.RecordValue.Fields))
default:
return "(unknown)"
}
}