You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							172 lines
						
					
					
						
							4.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							172 lines
						
					
					
						
							4.6 KiB
						
					
					
				
								package main
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"flag"
							 | 
						|
									"fmt"
							 | 
						|
									"log"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/schema"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								var (
							 | 
						|
									agentAddr      = flag.String("agent", "localhost:16777", "MQ agent address")
							 | 
						|
									topicNamespace = flag.String("namespace", "test", "topic namespace")
							 | 
						|
									topicName      = flag.String("topic", "test-topic", "topic name")
							 | 
						|
									partitionCount = flag.Int("partitions", 4, "number of partitions")
							 | 
						|
									messageCount   = flag.Int("messages", 100, "number of messages to produce")
							 | 
						|
									publisherName  = flag.String("publisher", "test-producer", "publisher name")
							 | 
						|
									messageSize    = flag.Int("size", 1024, "message size in bytes")
							 | 
						|
									interval       = flag.Duration("interval", 100*time.Millisecond, "interval between messages")
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// TestMessage represents the structure of messages we'll be sending
							 | 
						|
								type TestMessage struct {
							 | 
						|
									ID        int64  `json:"id"`
							 | 
						|
									Message   string `json:"message"`
							 | 
						|
									Payload   []byte `json:"payload"`
							 | 
						|
									Timestamp int64  `json:"timestamp"`
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func main() {
							 | 
						|
									flag.Parse()
							 | 
						|
								
							 | 
						|
									fmt.Printf("Starting message producer:\n")
							 | 
						|
									fmt.Printf("  Agent: %s\n", *agentAddr)
							 | 
						|
									fmt.Printf("  Topic: %s.%s\n", *topicNamespace, *topicName)
							 | 
						|
									fmt.Printf("  Partitions: %d\n", *partitionCount)
							 | 
						|
									fmt.Printf("  Messages: %d\n", *messageCount)
							 | 
						|
									fmt.Printf("  Publisher: %s\n", *publisherName)
							 | 
						|
									fmt.Printf("  Message Size: %d bytes\n", *messageSize)
							 | 
						|
									fmt.Printf("  Interval: %v\n", *interval)
							 | 
						|
								
							 | 
						|
									// Create an instance of the message struct to generate schema from
							 | 
						|
									messageInstance := TestMessage{}
							 | 
						|
								
							 | 
						|
									// Automatically generate RecordType from the struct
							 | 
						|
									recordType := schema.StructToSchema(messageInstance)
							 | 
						|
									if recordType == nil {
							 | 
						|
										log.Fatalf("Failed to generate schema from struct")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									fmt.Printf("\nGenerated schema with %d fields:\n", len(recordType.Fields))
							 | 
						|
									for _, field := range recordType.Fields {
							 | 
						|
										fmt.Printf("  - %s: %s\n", field.Name, getTypeString(field.Type))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									topicSchema := schema.NewSchema(*topicNamespace, *topicName, recordType)
							 | 
						|
								
							 | 
						|
									// Create publish session
							 | 
						|
									session, err := agent_client.NewPublishSession(*agentAddr, topicSchema, *partitionCount, *publisherName)
							 | 
						|
									if err != nil {
							 | 
						|
										log.Fatalf("Failed to create publish session: %v", err)
							 | 
						|
									}
							 | 
						|
									defer session.CloseSession()
							 | 
						|
								
							 | 
						|
									// Create message payload
							 | 
						|
									payload := make([]byte, *messageSize)
							 | 
						|
									for i := range payload {
							 | 
						|
										payload[i] = byte(i % 256)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Start producing messages
							 | 
						|
									fmt.Printf("\nStarting to produce messages...\n")
							 | 
						|
									startTime := time.Now()
							 | 
						|
								
							 | 
						|
									for i := 0; i < *messageCount; i++ {
							 | 
						|
										key := fmt.Sprintf("key-%d", i)
							 | 
						|
								
							 | 
						|
										// Create a message struct
							 | 
						|
										message := TestMessage{
							 | 
						|
											ID:        int64(i),
							 | 
						|
											Message:   fmt.Sprintf("This is message number %d", i),
							 | 
						|
											Payload:   payload[:min(100, len(payload))], // First 100 bytes
							 | 
						|
											Timestamp: time.Now().UnixNano(),
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Convert struct to RecordValue
							 | 
						|
										record := structToRecordValue(message)
							 | 
						|
								
							 | 
						|
										err := session.PublishMessageRecord([]byte(key), record)
							 | 
						|
										if err != nil {
							 | 
						|
											log.Printf("Failed to publish message %d: %v", i, err)
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if (i+1)%10 == 0 {
							 | 
						|
											fmt.Printf("Published %d messages\n", i+1)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if *interval > 0 {
							 | 
						|
											time.Sleep(*interval)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									duration := time.Since(startTime)
							 | 
						|
									fmt.Printf("\nCompleted producing %d messages in %v\n", *messageCount, duration)
							 | 
						|
									fmt.Printf("Throughput: %.2f messages/sec\n", float64(*messageCount)/duration.Seconds())
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Helper function to convert struct to RecordValue
							 | 
						|
								func structToRecordValue(msg TestMessage) *schema_pb.RecordValue {
							 | 
						|
									return &schema_pb.RecordValue{
							 | 
						|
										Fields: map[string]*schema_pb.Value{
							 | 
						|
											"ID": {
							 | 
						|
												Kind: &schema_pb.Value_Int64Value{
							 | 
						|
													Int64Value: msg.ID,
							 | 
						|
												},
							 | 
						|
											},
							 | 
						|
											"Message": {
							 | 
						|
												Kind: &schema_pb.Value_StringValue{
							 | 
						|
													StringValue: msg.Message,
							 | 
						|
												},
							 | 
						|
											},
							 | 
						|
											"Payload": {
							 | 
						|
												Kind: &schema_pb.Value_BytesValue{
							 | 
						|
													BytesValue: msg.Payload,
							 | 
						|
												},
							 | 
						|
											},
							 | 
						|
											"Timestamp": {
							 | 
						|
												Kind: &schema_pb.Value_Int64Value{
							 | 
						|
													Int64Value: msg.Timestamp,
							 | 
						|
												},
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func getTypeString(t *schema_pb.Type) string {
							 | 
						|
									switch kind := t.Kind.(type) {
							 | 
						|
									case *schema_pb.Type_ScalarType:
							 | 
						|
										switch kind.ScalarType {
							 | 
						|
										case schema_pb.ScalarType_BOOL:
							 | 
						|
											return "bool"
							 | 
						|
										case schema_pb.ScalarType_INT32:
							 | 
						|
											return "int32"
							 | 
						|
										case schema_pb.ScalarType_INT64:
							 | 
						|
											return "int64"
							 | 
						|
										case schema_pb.ScalarType_FLOAT:
							 | 
						|
											return "float"
							 | 
						|
										case schema_pb.ScalarType_DOUBLE:
							 | 
						|
											return "double"
							 | 
						|
										case schema_pb.ScalarType_BYTES:
							 | 
						|
											return "bytes"
							 | 
						|
										case schema_pb.ScalarType_STRING:
							 | 
						|
											return "string"
							 | 
						|
										}
							 | 
						|
									case *schema_pb.Type_ListType:
							 | 
						|
										return fmt.Sprintf("list<%s>", getTypeString(kind.ListType.ElementType))
							 | 
						|
									case *schema_pb.Type_RecordType:
							 | 
						|
										return "record"
							 | 
						|
									}
							 | 
						|
									return "unknown"
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func min(a, b int) int {
							 | 
						|
									if a < b {
							 | 
						|
										return a
							 | 
						|
									}
							 | 
						|
									return b
							 | 
						|
								}
							 |