- Add produceSchemaBasedRecord method to encode Kafka messages as RecordValue
- Create RecordValue structure with key, value, timestamp fields
- Support schema registry integration for Confluent-framed messages
- Add ProduceRecordValue method to SeaweedMQHandler interface and implementations
- Update both BrokerClient and AgentClient to handle RecordValue publishing
- Properly decode schematized messages and embed them in RecordValue structure
- Add schema metadata (schema_id, schema_format) for schematized messages
- Created centralized PartitionMapper utility with consistent range size of 32
- Fixed inconsistent partition mapping between agent_client.go (78 range) and seaweedmq_handler.go (32 range)
- Updated smq_mapping.go to use centralized utility instead of dynamic calculation
- Standardized all partition mapping to use kafka.CreateSMQPartition() and related functions
- Added comprehensive tests for partition mapping consistency and round-trip conversion
- Achieves 99.05% ring utilization supporting 78 Kafka partitions
This fixes the high-priority issue where inconsistent partition mapping could cause
incorrect message routing between different components of the Kafka Gateway.
- Enhanced AgentClient with comprehensive Kafka record schema
- Added kafka_key, kafka_value, kafka_timestamp, kafka_headers fields
- Added kafka_offset and kafka_partition for full Kafka compatibility
- Implemented createKafkaRecordSchema() for structured message storage
- Enhanced SeaweedMQHandler with schema-aware topic management
- Added CreateTopicWithSchema() method for proper schema registration
- Integrated getDefaultKafkaSchema() for consistent schema across topics
- Enhanced KafkaTopicInfo to store schema metadata
- Enhanced Produce API with SeaweedMQ integration
- Updated produceToSeaweedMQ() to use enhanced schema
- Added comprehensive debug logging for SeaweedMQ operations
- Maintained backward compatibility with in-memory mode
- Added comprehensive integration tests
- TestSeaweedMQIntegration for end-to-end SeaweedMQ backend testing
- TestSchemaCompatibility for various message format validation
- Tests verify enhanced schema works with different key-value types
This implements the mq.agent architecture pattern for Kafka Gateway,
providing structured message storage in SeaweedFS with full schema support.
- Implement comprehensive consumer group coordinator with state management
- Add JoinGroup API (key 11) for consumer group membership
- Add SyncGroup API (key 14) for partition assignment coordination
- Create Range and RoundRobin assignment strategies
- Support consumer group lifecycle: Empty -> PreparingRebalance -> CompletingRebalance -> Stable
- Add automatic member cleanup and expired session handling
- Comprehensive test coverage for consumer groups, assignment strategies
- Update ApiVersions to advertise 9 APIs total (was 7)
- All existing integration tests pass with new consumer group support
This provides the foundation for distributed Kafka consumers with automatic
partition rebalancing and group coordination, compatible with standard Kafka clients.
- Add AgentClient for gRPC communication with SeaweedMQ Agent
- Implement SeaweedMQHandler with real message storage backend
- Update protocol handlers to support both in-memory and SeaweedMQ modes
- Add CLI flags for SeaweedMQ agent address (-agent, -seaweedmq)
- Gateway gracefully falls back to in-memory mode if agent unavailable
- Comprehensive integration tests for SeaweedMQ mode
- Maintains full backward compatibility with Phase 1 implementation
- Ready for production use with real SeaweedMQ deployment