You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							192 lines
						
					
					
						
							5.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							192 lines
						
					
					
						
							5.8 KiB
						
					
					
				
								package main
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"flag"
							 | 
						|
									"fmt"
							 | 
						|
									"log"
							 | 
						|
									"os"
							 | 
						|
									"os/signal"
							 | 
						|
									"sync"
							 | 
						|
									"syscall"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								var (
							 | 
						|
									agentAddr               = flag.String("agent", "localhost:16777", "MQ agent address")
							 | 
						|
									topicNamespace          = flag.String("namespace", "test", "topic namespace")
							 | 
						|
									topicName               = flag.String("topic", "test-topic", "topic name")
							 | 
						|
									consumerGroup           = flag.String("group", "test-consumer-group", "consumer group name")
							 | 
						|
									consumerGroupInstanceId = flag.String("instance", "test-consumer-1", "consumer group instance id")
							 | 
						|
									maxPartitions           = flag.Int("max-partitions", 10, "maximum number of partitions to consume")
							 | 
						|
									slidingWindowSize       = flag.Int("window-size", 100, "sliding window size for concurrent processing")
							 | 
						|
									offsetType              = flag.String("offset", "latest", "offset type: earliest, latest, timestamp")
							 | 
						|
									offsetTsNs              = flag.Int64("offset-ts", 0, "offset timestamp in nanoseconds (for timestamp offset type)")
							 | 
						|
									showMessages            = flag.Bool("show-messages", true, "show consumed messages")
							 | 
						|
									logProgress             = flag.Bool("log-progress", true, "log progress every 10 messages")
							 | 
						|
									filter                  = flag.String("filter", "", "message filter")
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func main() {
							 | 
						|
									flag.Parse()
							 | 
						|
								
							 | 
						|
									fmt.Printf("Starting message consumer:\n")
							 | 
						|
									fmt.Printf("  Agent: %s\n", *agentAddr)
							 | 
						|
									fmt.Printf("  Topic: %s.%s\n", *topicNamespace, *topicName)
							 | 
						|
									fmt.Printf("  Consumer Group: %s\n", *consumerGroup)
							 | 
						|
									fmt.Printf("  Consumer Instance: %s\n", *consumerGroupInstanceId)
							 | 
						|
									fmt.Printf("  Max Partitions: %d\n", *maxPartitions)
							 | 
						|
									fmt.Printf("  Sliding Window Size: %d\n", *slidingWindowSize)
							 | 
						|
									fmt.Printf("  Offset Type: %s\n", *offsetType)
							 | 
						|
									fmt.Printf("  Filter: %s\n", *filter)
							 | 
						|
								
							 | 
						|
									// Create topic
							 | 
						|
									topicObj := topic.NewTopic(*topicNamespace, *topicName)
							 | 
						|
								
							 | 
						|
									// Determine offset type
							 | 
						|
									var pbOffsetType schema_pb.OffsetType
							 | 
						|
									switch *offsetType {
							 | 
						|
									case "earliest":
							 | 
						|
										pbOffsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
							 | 
						|
									case "latest":
							 | 
						|
										pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
							 | 
						|
									case "timestamp":
							 | 
						|
										pbOffsetType = schema_pb.OffsetType_EXACT_TS_NS
							 | 
						|
									default:
							 | 
						|
										pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create subscribe option
							 | 
						|
									option := &agent_client.SubscribeOption{
							 | 
						|
										ConsumerGroup:           *consumerGroup,
							 | 
						|
										ConsumerGroupInstanceId: *consumerGroupInstanceId,
							 | 
						|
										Topic:                   topicObj,
							 | 
						|
										OffsetType:              pbOffsetType,
							 | 
						|
										OffsetTsNs:              *offsetTsNs,
							 | 
						|
										Filter:                  *filter,
							 | 
						|
										MaxSubscribedPartitions: int32(*maxPartitions),
							 | 
						|
										SlidingWindowSize:       int32(*slidingWindowSize),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create subscribe session
							 | 
						|
									session, err := agent_client.NewSubscribeSession(*agentAddr, option)
							 | 
						|
									if err != nil {
							 | 
						|
										log.Fatalf("Failed to create subscribe session: %v", err)
							 | 
						|
									}
							 | 
						|
									defer session.CloseSession()
							 | 
						|
								
							 | 
						|
									// Statistics
							 | 
						|
									var messageCount int64
							 | 
						|
									var mu sync.Mutex
							 | 
						|
									startTime := time.Now()
							 | 
						|
								
							 | 
						|
									// Handle graceful shutdown
							 | 
						|
									sigChan := make(chan os.Signal, 1)
							 | 
						|
									signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
							 | 
						|
								
							 | 
						|
									// Channel to signal completion
							 | 
						|
									done := make(chan error, 1)
							 | 
						|
								
							 | 
						|
									// Start consuming messages
							 | 
						|
									fmt.Printf("\nStarting to consume messages...\n")
							 | 
						|
									go func() {
							 | 
						|
										err := session.SubscribeMessageRecord(
							 | 
						|
											// onEachMessageFn
							 | 
						|
											func(key []byte, record *schema_pb.RecordValue) {
							 | 
						|
												mu.Lock()
							 | 
						|
												messageCount++
							 | 
						|
												currentCount := messageCount
							 | 
						|
												mu.Unlock()
							 | 
						|
								
							 | 
						|
												if *showMessages {
							 | 
						|
													fmt.Printf("Received message: key=%s\n", string(key))
							 | 
						|
													printRecordValue(record)
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												if *logProgress && currentCount%10 == 0 {
							 | 
						|
													elapsed := time.Since(startTime)
							 | 
						|
													rate := float64(currentCount) / elapsed.Seconds()
							 | 
						|
													fmt.Printf("Consumed %d messages (%.2f msg/sec)\n", currentCount, rate)
							 | 
						|
												}
							 | 
						|
											},
							 | 
						|
											// onCompletionFn
							 | 
						|
											func() {
							 | 
						|
												fmt.Printf("Subscription completed\n")
							 | 
						|
												done <- nil
							 | 
						|
											},
							 | 
						|
										)
							 | 
						|
										if err != nil {
							 | 
						|
											done <- err
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for signal or completion
							 | 
						|
									select {
							 | 
						|
									case <-sigChan:
							 | 
						|
										fmt.Printf("\nReceived shutdown signal, stopping consumer...\n")
							 | 
						|
									case err := <-done:
							 | 
						|
										if err != nil {
							 | 
						|
											log.Printf("Subscription error: %v", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Print final statistics
							 | 
						|
									mu.Lock()
							 | 
						|
									finalCount := messageCount
							 | 
						|
									mu.Unlock()
							 | 
						|
								
							 | 
						|
									duration := time.Since(startTime)
							 | 
						|
									fmt.Printf("Consumed %d messages in %v\n", finalCount, duration)
							 | 
						|
									if duration.Seconds() > 0 {
							 | 
						|
										fmt.Printf("Average throughput: %.2f messages/sec\n", float64(finalCount)/duration.Seconds())
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func printRecordValue(record *schema_pb.RecordValue) {
							 | 
						|
									if record == nil || record.Fields == nil {
							 | 
						|
										fmt.Printf("  (empty record)\n")
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									for fieldName, value := range record.Fields {
							 | 
						|
										fmt.Printf("  %s: %s\n", fieldName, formatValue(value))
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func formatValue(value *schema_pb.Value) string {
							 | 
						|
									if value == nil {
							 | 
						|
										return "(nil)"
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									switch kind := value.Kind.(type) {
							 | 
						|
									case *schema_pb.Value_BoolValue:
							 | 
						|
										return fmt.Sprintf("%t", kind.BoolValue)
							 | 
						|
									case *schema_pb.Value_Int32Value:
							 | 
						|
										return fmt.Sprintf("%d", kind.Int32Value)
							 | 
						|
									case *schema_pb.Value_Int64Value:
							 | 
						|
										return fmt.Sprintf("%d", kind.Int64Value)
							 | 
						|
									case *schema_pb.Value_FloatValue:
							 | 
						|
										return fmt.Sprintf("%f", kind.FloatValue)
							 | 
						|
									case *schema_pb.Value_DoubleValue:
							 | 
						|
										return fmt.Sprintf("%f", kind.DoubleValue)
							 | 
						|
									case *schema_pb.Value_BytesValue:
							 | 
						|
										if len(kind.BytesValue) > 50 {
							 | 
						|
											return fmt.Sprintf("bytes[%d] %x...", len(kind.BytesValue), kind.BytesValue[:50])
							 | 
						|
										}
							 | 
						|
										return fmt.Sprintf("bytes[%d] %x", len(kind.BytesValue), kind.BytesValue)
							 | 
						|
									case *schema_pb.Value_StringValue:
							 | 
						|
										if len(kind.StringValue) > 100 {
							 | 
						|
											return fmt.Sprintf("\"%s...\"", kind.StringValue[:100])
							 | 
						|
										}
							 | 
						|
										return fmt.Sprintf("\"%s\"", kind.StringValue)
							 | 
						|
									case *schema_pb.Value_ListValue:
							 | 
						|
										return fmt.Sprintf("list[%d items]", len(kind.ListValue.Values))
							 | 
						|
									case *schema_pb.Value_RecordValue:
							 | 
						|
										return fmt.Sprintf("record[%d fields]", len(kind.RecordValue.Fields))
							 | 
						|
									default:
							 | 
						|
										return "(unknown)"
							 | 
						|
									}
							 | 
						|
								}
							 |