You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							776 lines
						
					
					
						
							25 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							776 lines
						
					
					
						
							25 KiB
						
					
					
				
								package consumer
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"encoding/binary"
							 | 
						|
									"encoding/json"
							 | 
						|
									"fmt"
							 | 
						|
									"log"
							 | 
						|
									"os"
							 | 
						|
									"strings"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/IBM/sarama"
							 | 
						|
									"github.com/linkedin/goavro/v2"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
							 | 
						|
									pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// Consumer represents a Kafka consumer for load testing
							 | 
						|
								type Consumer struct {
							 | 
						|
									id               int
							 | 
						|
									config           *config.Config
							 | 
						|
									metricsCollector *metrics.Collector
							 | 
						|
									saramaConsumer   sarama.ConsumerGroup
							 | 
						|
									useConfluent     bool // Always false, Sarama only
							 | 
						|
									topics           []string
							 | 
						|
									consumerGroup    string
							 | 
						|
									avroCodec        *goavro.Codec
							 | 
						|
								
							 | 
						|
									// Schema format tracking per topic
							 | 
						|
									schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF)
							 | 
						|
								
							 | 
						|
									// Processing tracking
							 | 
						|
									messagesProcessed int64
							 | 
						|
									lastOffset        map[string]map[int32]int64
							 | 
						|
									offsetMutex       sync.RWMutex
							 | 
						|
								
							 | 
						|
									// Record tracking
							 | 
						|
									tracker *tracker.Tracker
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// New creates a new consumer instance
							 | 
						|
								func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Consumer, error) {
							 | 
						|
									// All consumers share the same group for load balancing across partitions
							 | 
						|
									consumerGroup := cfg.Consumers.GroupPrefix
							 | 
						|
								
							 | 
						|
									c := &Consumer{
							 | 
						|
										id:               id,
							 | 
						|
										config:           cfg,
							 | 
						|
										metricsCollector: collector,
							 | 
						|
										topics:           cfg.GetTopicNames(),
							 | 
						|
										consumerGroup:    consumerGroup,
							 | 
						|
										useConfluent:     false, // Use Sarama by default
							 | 
						|
										lastOffset:       make(map[string]map[int32]int64),
							 | 
						|
										schemaFormats:    make(map[string]string),
							 | 
						|
										tracker:          recordTracker,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Initialize schema formats for each topic (must match producer logic)
							 | 
						|
									// This mirrors the format distribution in cmd/loadtest/main.go registerSchemas()
							 | 
						|
									for i, topic := range c.topics {
							 | 
						|
										var schemaFormat string
							 | 
						|
										if cfg.Producers.SchemaFormat != "" {
							 | 
						|
											// Use explicit config if provided
							 | 
						|
											schemaFormat = cfg.Producers.SchemaFormat
							 | 
						|
										} else {
							 | 
						|
											// Distribute across formats (same as producer)
							 | 
						|
											switch i % 3 {
							 | 
						|
											case 0:
							 | 
						|
												schemaFormat = "AVRO"
							 | 
						|
											case 1:
							 | 
						|
												schemaFormat = "JSON"
							 | 
						|
											case 2:
							 | 
						|
												schemaFormat = "PROTOBUF"
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										c.schemaFormats[topic] = schemaFormat
							 | 
						|
										log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Initialize consumer based on configuration
							 | 
						|
									if c.useConfluent {
							 | 
						|
										if err := c.initConfluentConsumer(); err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err)
							 | 
						|
										}
							 | 
						|
									} else {
							 | 
						|
										if err := c.initSaramaConsumer(); err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Initialize Avro codec if schemas are enabled
							 | 
						|
									if cfg.Schemas.Enabled {
							 | 
						|
										if err := c.initAvroCodec(); err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									log.Printf("Consumer %d initialized for group %s", id, consumerGroup)
							 | 
						|
									return c, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// initSaramaConsumer initializes the Sarama consumer group
							 | 
						|
								func (c *Consumer) initSaramaConsumer() error {
							 | 
						|
									config := sarama.NewConfig()
							 | 
						|
								
							 | 
						|
									// Enable Sarama debug logging to diagnose connection issues
							 | 
						|
									sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[Sarama Consumer %d] ", c.id), log.LstdFlags)
							 | 
						|
								
							 | 
						|
									// Consumer configuration
							 | 
						|
									config.Consumer.Return.Errors = true
							 | 
						|
									config.Consumer.Offsets.Initial = sarama.OffsetOldest
							 | 
						|
									if c.config.Consumers.AutoOffsetReset == "latest" {
							 | 
						|
										config.Consumer.Offsets.Initial = sarama.OffsetNewest
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Auto commit configuration
							 | 
						|
									config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit
							 | 
						|
									config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond
							 | 
						|
								
							 | 
						|
									// Session and heartbeat configuration
							 | 
						|
									config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond
							 | 
						|
									config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond
							 | 
						|
								
							 | 
						|
									// Fetch configuration
							 | 
						|
									config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes)
							 | 
						|
									config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default)
							 | 
						|
									config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes)
							 | 
						|
									config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond
							 | 
						|
									config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond
							 | 
						|
								
							 | 
						|
									// Channel buffer sizes for concurrent partition consumption
							 | 
						|
									config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering
							 | 
						|
								
							 | 
						|
									// Enable concurrent partition fetching by increasing the number of broker connections
							 | 
						|
									// This allows Sarama to fetch from multiple partitions in parallel
							 | 
						|
									config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
							 | 
						|
								
							 | 
						|
									// Connection retry and timeout configuration
							 | 
						|
									config.Net.DialTimeout = 30 * time.Second  // Increase from default 30s
							 | 
						|
									config.Net.ReadTimeout = 30 * time.Second  // Increase from default 30s
							 | 
						|
									config.Net.WriteTimeout = 30 * time.Second // Increase from default 30s
							 | 
						|
									config.Metadata.Retry.Max = 5              // Retry metadata fetch up to 5 times
							 | 
						|
									config.Metadata.Retry.Backoff = 500 * time.Millisecond
							 | 
						|
									config.Metadata.Timeout = 30 * time.Second // Increase metadata timeout
							 | 
						|
								
							 | 
						|
									// Version
							 | 
						|
									config.Version = sarama.V2_8_0_0
							 | 
						|
								
							 | 
						|
									// CRITICAL: Set unique ClientID to ensure each consumer gets a unique member ID
							 | 
						|
									// Without this, all consumers from the same process get the same member ID and only 1 joins!
							 | 
						|
									// Sarama uses ClientID as part of the member ID generation
							 | 
						|
									// Use consumer ID directly - no timestamp needed since IDs are already unique per process
							 | 
						|
									config.ClientID = fmt.Sprintf("loadtest-consumer-%d", c.id)
							 | 
						|
									log.Printf("Consumer %d: Setting Sarama ClientID to: %s", c.id, config.ClientID)
							 | 
						|
								
							 | 
						|
									// Create consumer group
							 | 
						|
									consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create Sarama consumer group: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.saramaConsumer = consumerGroup
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// initConfluentConsumer initializes the Confluent Kafka Go consumer
							 | 
						|
								func (c *Consumer) initConfluentConsumer() error {
							 | 
						|
									// Confluent consumer disabled, using Sarama only
							 | 
						|
									return fmt.Errorf("confluent consumer not enabled")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// initAvroCodec initializes the Avro codec for schema-based messages
							 | 
						|
								func (c *Consumer) initAvroCodec() error {
							 | 
						|
									// Use the LoadTestMessage schema (matches what producer uses)
							 | 
						|
									loadTestSchema := `{
							 | 
						|
										"type": "record",
							 | 
						|
										"name": "LoadTestMessage",
							 | 
						|
										"namespace": "com.seaweedfs.loadtest",
							 | 
						|
										"fields": [
							 | 
						|
											{"name": "id", "type": "string"},
							 | 
						|
											{"name": "timestamp", "type": "long"},
							 | 
						|
											{"name": "producer_id", "type": "int"},
							 | 
						|
											{"name": "counter", "type": "long"},
							 | 
						|
											{"name": "user_id", "type": "string"},
							 | 
						|
											{"name": "event_type", "type": "string"},
							 | 
						|
											{"name": "properties", "type": {"type": "map", "values": "string"}}
							 | 
						|
										]
							 | 
						|
									}`
							 | 
						|
								
							 | 
						|
									codec, err := goavro.NewCodec(loadTestSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create Avro codec: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.avroCodec = codec
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Run starts the consumer and consumes messages until the context is cancelled
							 | 
						|
								func (c *Consumer) Run(ctx context.Context) {
							 | 
						|
									log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup)
							 | 
						|
									defer log.Printf("Consumer %d stopped", c.id)
							 | 
						|
								
							 | 
						|
									if c.useConfluent {
							 | 
						|
										c.runConfluentConsumer(ctx)
							 | 
						|
									} else {
							 | 
						|
										c.runSaramaConsumer(ctx)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// runSaramaConsumer runs the Sarama consumer group
							 | 
						|
								func (c *Consumer) runSaramaConsumer(ctx context.Context) {
							 | 
						|
									handler := &ConsumerGroupHandler{
							 | 
						|
										consumer: c,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
								
							 | 
						|
									// Start error handler
							 | 
						|
									wg.Add(1)
							 | 
						|
									go func() {
							 | 
						|
										defer wg.Done()
							 | 
						|
										for {
							 | 
						|
											select {
							 | 
						|
											case err, ok := <-c.saramaConsumer.Errors():
							 | 
						|
												if !ok {
							 | 
						|
													return
							 | 
						|
												}
							 | 
						|
												log.Printf("Consumer %d error: %v", c.id, err)
							 | 
						|
												c.metricsCollector.RecordConsumerError()
							 | 
						|
											case <-ctx.Done():
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Start consumer group session
							 | 
						|
									wg.Add(1)
							 | 
						|
									go func() {
							 | 
						|
										defer wg.Done()
							 | 
						|
										for {
							 | 
						|
											select {
							 | 
						|
											case <-ctx.Done():
							 | 
						|
												return
							 | 
						|
											default:
							 | 
						|
												if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil {
							 | 
						|
													log.Printf("Consumer %d: Error consuming: %v", c.id, err)
							 | 
						|
													c.metricsCollector.RecordConsumerError()
							 | 
						|
								
							 | 
						|
													// Wait briefly before retrying (reduced from 5s to 1s for faster recovery)
							 | 
						|
													select {
							 | 
						|
													case <-time.After(1 * time.Second):
							 | 
						|
													case <-ctx.Done():
							 | 
						|
														return
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Start lag monitoring
							 | 
						|
									wg.Add(1)
							 | 
						|
									go func() {
							 | 
						|
										defer wg.Done()
							 | 
						|
										c.monitorConsumerLag(ctx)
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for completion
							 | 
						|
									<-ctx.Done()
							 | 
						|
									log.Printf("Consumer %d: Context cancelled, shutting down", c.id)
							 | 
						|
									wg.Wait()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// runConfluentConsumer runs the Confluent consumer
							 | 
						|
								func (c *Consumer) runConfluentConsumer(ctx context.Context) {
							 | 
						|
									// Confluent consumer disabled, using Sarama only
							 | 
						|
									log.Printf("Consumer %d: Confluent consumer not enabled", c.id)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// processMessage processes a consumed message
							 | 
						|
								func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error {
							 | 
						|
									topic := ""
							 | 
						|
									if topicPtr != nil {
							 | 
						|
										topic = *topicPtr
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Update offset tracking
							 | 
						|
									c.updateOffset(topic, partition, offset)
							 | 
						|
								
							 | 
						|
									// Decode message based on topic-specific schema format
							 | 
						|
									var decodedMessage interface{}
							 | 
						|
									var err error
							 | 
						|
								
							 | 
						|
									// Determine schema format for this topic (if schemas are enabled)
							 | 
						|
									var schemaFormat string
							 | 
						|
									if c.config.Schemas.Enabled {
							 | 
						|
										schemaFormat = c.schemaFormats[topic]
							 | 
						|
										if schemaFormat == "" {
							 | 
						|
											// Fallback to config if topic not in map
							 | 
						|
											schemaFormat = c.config.Producers.ValueType
							 | 
						|
										}
							 | 
						|
									} else {
							 | 
						|
										// No schemas, use global value type
							 | 
						|
										schemaFormat = c.config.Producers.ValueType
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Decode message based on format
							 | 
						|
									switch schemaFormat {
							 | 
						|
									case "avro", "AVRO":
							 | 
						|
										decodedMessage, err = c.decodeAvroMessage(value)
							 | 
						|
									case "json", "JSON", "JSON_SCHEMA":
							 | 
						|
										decodedMessage, err = c.decodeJSONSchemaMessage(value)
							 | 
						|
									case "protobuf", "PROTOBUF":
							 | 
						|
										decodedMessage, err = c.decodeProtobufMessage(value)
							 | 
						|
									case "binary":
							 | 
						|
										decodedMessage, err = c.decodeBinaryMessage(value)
							 | 
						|
									default:
							 | 
						|
										// Fallback to plain JSON
							 | 
						|
										decodedMessage, err = c.decodeJSONMessage(value)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to decode message: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Note: Removed artificial delay to allow maximum throughput
							 | 
						|
									// If you need to simulate processing time, add a configurable delay setting
							 | 
						|
									// time.Sleep(time.Millisecond) // Minimal processing delay
							 | 
						|
								
							 | 
						|
									// Record metrics
							 | 
						|
									c.metricsCollector.RecordConsumedMessage(len(value))
							 | 
						|
									c.messagesProcessed++
							 | 
						|
								
							 | 
						|
									// Log progress
							 | 
						|
									if c.id == 0 && c.messagesProcessed%1000 == 0 {
							 | 
						|
										log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)",
							 | 
						|
											c.id, c.messagesProcessed, topic, partition, offset)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Optional: Validate message content (for testing purposes)
							 | 
						|
									if c.config.Chaos.Enabled {
							 | 
						|
										if err := c.validateMessage(decodedMessage); err != nil {
							 | 
						|
											log.Printf("Consumer %d: Message validation failed: %v", c.id, err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeJSONMessage decodes a JSON message
							 | 
						|
								func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) {
							 | 
						|
									var message map[string]interface{}
							 | 
						|
									if err := json.Unmarshal(value, &message); err != nil {
							 | 
						|
										// DEBUG: Log the raw bytes when JSON parsing fails
							 | 
						|
										log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v",
							 | 
						|
											c.id, len(value), value, string(value), err)
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									return message, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format)
							 | 
						|
								func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) {
							 | 
						|
									if c.avroCodec == nil {
							 | 
						|
										return nil, fmt.Errorf("Avro codec not initialized")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Handle Confluent Wire Format when schemas are enabled
							 | 
						|
									var avroData []byte
							 | 
						|
									if c.config.Schemas.Enabled {
							 | 
						|
										if len(value) < 5 {
							 | 
						|
											return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check magic byte (should be 0)
							 | 
						|
										if value[0] != 0 {
							 | 
						|
											return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Extract schema ID (bytes 1-4, big-endian)
							 | 
						|
										schemaID := binary.BigEndian.Uint32(value[1:5])
							 | 
						|
										_ = schemaID // TODO: Could validate schema ID matches expected schema
							 | 
						|
								
							 | 
						|
										// Extract Avro data (bytes 5+)
							 | 
						|
										avroData = value[5:]
							 | 
						|
									} else {
							 | 
						|
										// No wire format, use raw data
							 | 
						|
										avroData = value
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									native, _, err := c.avroCodec.NativeFromBinary(avroData)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to decode Avro data: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return native, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format)
							 | 
						|
								func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) {
							 | 
						|
									// Handle Confluent Wire Format when schemas are enabled
							 | 
						|
									var jsonData []byte
							 | 
						|
									if c.config.Schemas.Enabled {
							 | 
						|
										if len(value) < 5 {
							 | 
						|
											return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check magic byte (should be 0)
							 | 
						|
										if value[0] != 0 {
							 | 
						|
											return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Extract schema ID (bytes 1-4, big-endian)
							 | 
						|
										schemaID := binary.BigEndian.Uint32(value[1:5])
							 | 
						|
										_ = schemaID // TODO: Could validate schema ID matches expected schema
							 | 
						|
								
							 | 
						|
										// Extract JSON data (bytes 5+)
							 | 
						|
										jsonData = value[5:]
							 | 
						|
									} else {
							 | 
						|
										// No wire format, use raw data
							 | 
						|
										jsonData = value
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Decode JSON
							 | 
						|
									var message map[string]interface{}
							 | 
						|
									if err := json.Unmarshal(jsonData, &message); err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to decode JSON data: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return message, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format)
							 | 
						|
								func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) {
							 | 
						|
									// Handle Confluent Wire Format when schemas are enabled
							 | 
						|
									var protoData []byte
							 | 
						|
									if c.config.Schemas.Enabled {
							 | 
						|
										if len(value) < 5 {
							 | 
						|
											return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check magic byte (should be 0)
							 | 
						|
										if value[0] != 0 {
							 | 
						|
											return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Extract schema ID (bytes 1-4, big-endian)
							 | 
						|
										schemaID := binary.BigEndian.Uint32(value[1:5])
							 | 
						|
										_ = schemaID // TODO: Could validate schema ID matches expected schema
							 | 
						|
								
							 | 
						|
										// Extract Protobuf data (bytes 5+)
							 | 
						|
										protoData = value[5:]
							 | 
						|
									} else {
							 | 
						|
										// No wire format, use raw data
							 | 
						|
										protoData = value
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Unmarshal protobuf message
							 | 
						|
									var protoMsg pb.LoadTestMessage
							 | 
						|
									if err := proto.Unmarshal(protoData, &protoMsg); err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert to map for consistency with other decoders
							 | 
						|
									return map[string]interface{}{
							 | 
						|
										"id":          protoMsg.Id,
							 | 
						|
										"timestamp":   protoMsg.Timestamp,
							 | 
						|
										"producer_id": protoMsg.ProducerId,
							 | 
						|
										"counter":     protoMsg.Counter,
							 | 
						|
										"user_id":     protoMsg.UserId,
							 | 
						|
										"event_type":  protoMsg.EventType,
							 | 
						|
										"properties":  protoMsg.Properties,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeBinaryMessage decodes a binary message
							 | 
						|
								func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) {
							 | 
						|
									if len(value) < 20 {
							 | 
						|
										return nil, fmt.Errorf("binary message too short")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Extract fields from the binary format:
							 | 
						|
									// [producer_id:4][counter:8][timestamp:8][random_data:...]
							 | 
						|
								
							 | 
						|
									producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3])
							 | 
						|
								
							 | 
						|
									var counter int64
							 | 
						|
									for i := 0; i < 8; i++ {
							 | 
						|
										counter |= int64(value[4+i]) << (56 - i*8)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var timestamp int64
							 | 
						|
									for i := 0; i < 8; i++ {
							 | 
						|
										timestamp |= int64(value[12+i]) << (56 - i*8)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return map[string]interface{}{
							 | 
						|
										"producer_id": producerID,
							 | 
						|
										"counter":     counter,
							 | 
						|
										"timestamp":   timestamp,
							 | 
						|
										"data_size":   len(value),
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateMessage performs basic message validation
							 | 
						|
								func (c *Consumer) validateMessage(message interface{}) error {
							 | 
						|
									// This is a placeholder for message validation logic
							 | 
						|
									// In a real load test, you might validate:
							 | 
						|
									// - Message structure
							 | 
						|
									// - Required fields
							 | 
						|
									// - Data consistency
							 | 
						|
									// - Schema compliance
							 | 
						|
								
							 | 
						|
									if message == nil {
							 | 
						|
										return fmt.Errorf("message is nil")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// updateOffset updates the last seen offset for lag calculation
							 | 
						|
								func (c *Consumer) updateOffset(topic string, partition int32, offset int64) {
							 | 
						|
									c.offsetMutex.Lock()
							 | 
						|
									defer c.offsetMutex.Unlock()
							 | 
						|
								
							 | 
						|
									if c.lastOffset[topic] == nil {
							 | 
						|
										c.lastOffset[topic] = make(map[int32]int64)
							 | 
						|
									}
							 | 
						|
									c.lastOffset[topic][partition] = offset
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// monitorConsumerLag monitors and reports consumer lag
							 | 
						|
								func (c *Consumer) monitorConsumerLag(ctx context.Context) {
							 | 
						|
									ticker := time.NewTicker(30 * time.Second)
							 | 
						|
									defer ticker.Stop()
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case <-ctx.Done():
							 | 
						|
											return
							 | 
						|
										case <-ticker.C:
							 | 
						|
											c.reportConsumerLag()
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// reportConsumerLag calculates and reports consumer lag
							 | 
						|
								func (c *Consumer) reportConsumerLag() {
							 | 
						|
									// This is a simplified lag calculation
							 | 
						|
									// In a real implementation, you would query the broker for high water marks
							 | 
						|
								
							 | 
						|
									c.offsetMutex.RLock()
							 | 
						|
									defer c.offsetMutex.RUnlock()
							 | 
						|
								
							 | 
						|
									for topic, partitions := range c.lastOffset {
							 | 
						|
										for partition, _ := range partitions {
							 | 
						|
											// For simplicity, assume lag is always 0 when we're consuming actively
							 | 
						|
											// In a real test, you would compare against the high water mark
							 | 
						|
											lag := int64(0)
							 | 
						|
								
							 | 
						|
											c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Close closes the consumer and cleans up resources
							 | 
						|
								func (c *Consumer) Close() error {
							 | 
						|
									log.Printf("Consumer %d: Closing", c.id)
							 | 
						|
								
							 | 
						|
									if c.saramaConsumer != nil {
							 | 
						|
										return c.saramaConsumer.Close()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
							 | 
						|
								type ConsumerGroupHandler struct {
							 | 
						|
									consumer *Consumer
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Setup is run at the beginning of a new session, before ConsumeClaim
							 | 
						|
								func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
							 | 
						|
									log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
							 | 
						|
								
							 | 
						|
									// Log the generation ID and member ID for this session
							 | 
						|
									log.Printf("Consumer %d: Generation=%d, MemberID=%s",
							 | 
						|
										h.consumer.id, session.GenerationID(), session.MemberID())
							 | 
						|
								
							 | 
						|
									// Log all assigned partitions and their starting offsets
							 | 
						|
									assignments := session.Claims()
							 | 
						|
									totalPartitions := 0
							 | 
						|
									for topic, partitions := range assignments {
							 | 
						|
										for _, partition := range partitions {
							 | 
						|
											totalPartitions++
							 | 
						|
											log.Printf("Consumer %d: ASSIGNED %s[%d]",
							 | 
						|
												h.consumer.id, topic, partition)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									log.Printf("Consumer %d: Total partitions assigned: %d", h.consumer.id, totalPartitions)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
							 | 
						|
								// CRITICAL: Commit all marked offsets before partition reassignment to minimize duplicates
							 | 
						|
								func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
							 | 
						|
									log.Printf("Consumer %d: Consumer group session cleanup - committing final offsets before rebalance", h.consumer.id)
							 | 
						|
								
							 | 
						|
									// Commit all marked offsets before releasing partitions
							 | 
						|
									// This ensures that when partitions are reassigned to other consumers,
							 | 
						|
									// they start from the last processed offset, minimizing duplicate reads
							 | 
						|
									session.Commit()
							 | 
						|
								
							 | 
						|
									log.Printf("Consumer %d: Cleanup complete - offsets committed", h.consumer.id)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
							 | 
						|
								func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
							 | 
						|
									msgCount := 0
							 | 
						|
									topic := claim.Topic()
							 | 
						|
									partition := claim.Partition()
							 | 
						|
									initialOffset := claim.InitialOffset()
							 | 
						|
									lastTrackedOffset := int64(-1)
							 | 
						|
									gapCount := 0
							 | 
						|
									var gaps []string // Track gap ranges for detailed analysis
							 | 
						|
								
							 | 
						|
									// Log the starting offset for this partition
							 | 
						|
									log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)",
							 | 
						|
										h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset())
							 | 
						|
								
							 | 
						|
									startTime := time.Now()
							 | 
						|
									lastLogTime := time.Now()
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case message, ok := <-claim.Messages():
							 | 
						|
											if !ok {
							 | 
						|
												elapsed := time.Since(startTime)
							 | 
						|
												// Log detailed gap analysis
							 | 
						|
												gapSummary := "none"
							 | 
						|
												if len(gaps) > 0 {
							 | 
						|
													gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", "))
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												// Check if we consumed just a few messages before stopping
							 | 
						|
												if msgCount <= 10 {
							 | 
						|
													log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)",
							 | 
						|
														h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
							 | 
						|
												} else {
							 | 
						|
													log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)",
							 | 
						|
														h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
							 | 
						|
														float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
							 | 
						|
												}
							 | 
						|
												return nil
							 | 
						|
											}
							 | 
						|
											msgCount++
							 | 
						|
								
							 | 
						|
											// Track gaps in offset sequence (indicates missed messages)
							 | 
						|
											if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 {
							 | 
						|
												gap := message.Offset - lastTrackedOffset - 1
							 | 
						|
												gapCount++
							 | 
						|
												gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1)
							 | 
						|
												gaps = append(gaps, gapDesc)
							 | 
						|
												elapsed := time.Since(startTime)
							 | 
						|
												log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)",
							 | 
						|
													h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc)
							 | 
						|
											}
							 | 
						|
											lastTrackedOffset = message.Offset
							 | 
						|
								
							 | 
						|
											// Log progress every 500 messages OR every 5 seconds
							 | 
						|
											now := time.Now()
							 | 
						|
											if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second {
							 | 
						|
												elapsed := time.Since(startTime)
							 | 
						|
												throughput := float64(msgCount) / elapsed.Seconds()
							 | 
						|
												log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d",
							 | 
						|
													h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount)
							 | 
						|
												lastLogTime = now
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Process the message
							 | 
						|
											var key []byte
							 | 
						|
											if message.Key != nil {
							 | 
						|
												key = message.Key
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
							 | 
						|
												log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v",
							 | 
						|
													h.consumer.id, message.Topic, message.Partition, message.Offset, err)
							 | 
						|
												h.consumer.metricsCollector.RecordConsumerError()
							 | 
						|
											} else {
							 | 
						|
												// Track consumed message
							 | 
						|
												if h.consumer.tracker != nil {
							 | 
						|
													h.consumer.tracker.TrackConsumed(tracker.Record{
							 | 
						|
														Key:        string(key),
							 | 
						|
														Topic:      message.Topic,
							 | 
						|
														Partition:  message.Partition,
							 | 
						|
														Offset:     message.Offset,
							 | 
						|
														Timestamp:  message.Timestamp.UnixNano(),
							 | 
						|
														ConsumerID: h.consumer.id,
							 | 
						|
													})
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												// Mark message as processed
							 | 
						|
												session.MarkMessage(message, "")
							 | 
						|
								
							 | 
						|
												// Commit offset frequently to minimize both message loss and duplicates
							 | 
						|
												// Every 20 messages balances:
							 | 
						|
												//   - ~600 commits per 12k messages (reasonable overhead)
							 | 
						|
												//   - ~20 message loss window if consumer fails
							 | 
						|
												//   - Reduces duplicate reads from rebalancing
							 | 
						|
												if msgCount%20 == 0 {
							 | 
						|
													session.Commit()
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
										case <-session.Context().Done():
							 | 
						|
											elapsed := time.Since(startTime)
							 | 
						|
											lastOffset := claim.HighWaterMarkOffset() - 1
							 | 
						|
											gapSummary := "none"
							 | 
						|
											if len(gaps) > 0 {
							 | 
						|
												gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", "))
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Determine if we reached HWM
							 | 
						|
											reachedHWM := lastTrackedOffset >= lastOffset
							 | 
						|
											hwmStatus := "INCOMPLETE"
							 | 
						|
											if reachedHWM {
							 | 
						|
												hwmStatus := "COMPLETE"
							 | 
						|
												_ = hwmStatus // Use it to avoid warning
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Calculate consumption rate for this partition
							 | 
						|
											consumptionRate := float64(0)
							 | 
						|
											if elapsed.Seconds() > 0 {
							 | 
						|
												consumptionRate = float64(msgCount) / elapsed.Seconds()
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Log both normal and abnormal completions
							 | 
						|
											if msgCount == 0 {
							 | 
						|
												// Partition never got ANY messages - critical issue
							 | 
						|
												log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)",
							 | 
						|
													h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus)
							 | 
						|
											} else if msgCount < 10 && msgCount > 0 {
							 | 
						|
												// Very few messages then stopped - likely hung fetch
							 | 
						|
												log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)",
							 | 
						|
													h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary)
							 | 
						|
											} else {
							 | 
						|
												// Normal completion
							 | 
						|
												log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)",
							 | 
						|
													h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
							 | 
						|
													consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary)
							 | 
						|
											}
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Helper functions
							 | 
						|
								
							 | 
						|
								func joinStrings(strs []string, sep string) string {
							 | 
						|
									if len(strs) == 0 {
							 | 
						|
										return ""
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									result := strs[0]
							 | 
						|
									for i := 1; i < len(strs); i++ {
							 | 
						|
										result += sep + strs[i]
							 | 
						|
									}
							 | 
						|
									return result
							 | 
						|
								}
							 |