BREAKING CHANGE: Remove fixed 'key', 'value', 'timestamp' structure
The previous implementation incorrectly used a fixed RecordValue structure with
hardcoded fields ('key', 'value', 'timestamp'). This was wrong because:
1. RecordValue should reflect the actual schema from the schema registry
2. Different messages have different schemas with different field names
3. The structure should be determined by the registered schema, not hardcoded
Correct Implementation:
- Produce Path: Only process Confluent-framed messages with schema registry
- Decode schematized messages to get actual RecordValue from schema
- Store the schema-based RecordValue (not fixed structure) in SeaweedMQ
- Fetch Path: Re-encode RecordValue back to Confluent format using schema
- Fallback to JSON representation when schema encoding fails
- Raw messages (non-schematized) bypass RecordValue processing entirely
Key Changes:
- produceSchemaBasedRecord: Only processes schematized messages, falls back to raw for others
- Remove createRecordValueFromKafkaMessage: No more fixed structure creation
- Update SMQ validation: Check for non-empty fields, not specific field names
- Update tests: Remove assumptions about fixed field structure
- Add TODO for proper RecordValue to Confluent format encoding
This ensures the Kafka gateway correctly uses schemas from the schema registry
instead of imposing an artificial fixed structure.
- Add schema_message_test.go with tests for RecordValue creation and decoding
- Test round-trip message integrity (Kafka -> RecordValue -> Kafka)
- Test schematized message handling with nested RecordValue structures
- Test backward compatibility with raw bytes (non-RecordValue messages)
- Add broker_recordvalue_test.go for SMQ broker RecordValue validation
- Test required field validation for Kafka topics (key, value, timestamp)
- Test different topic namespaces and validation rules
- Add schema_recordvalue_test.go for integration testing
- Test complete message flow with real SeaweedMQ backend
- Test different message types (JSON, text, binary, Unicode)
- Add ProduceRecordValue method to all test handlers
- Add public methods to Handler for testing access
- Add decodeRecordValueToKafkaMessage method to convert RecordValue back to Kafka messages
- Extract original Kafka message bytes from RecordValue structure
- Support both raw bytes and schematized messages in RecordValue
- Re-encode schematized messages back to Confluent format when schema registry is available
- Add fallback JSON conversion for RecordValue when schema encoding fails
- Maintain backward compatibility with non-RecordValue messages
- Handle different value types (bytes, strings, nested RecordValue) in RecordValue fields
- Add produceSchemaBasedRecord method to encode Kafka messages as RecordValue
- Create RecordValue structure with key, value, timestamp fields
- Support schema registry integration for Confluent-framed messages
- Add ProduceRecordValue method to SeaweedMQHandler interface and implementations
- Update both BrokerClient and AgentClient to handle RecordValue publishing
- Properly decode schematized messages and embed them in RecordValue structure
- Add schema metadata (schema_id, schema_format) for schematized messages
- Replace all fmt.Printf DEBUG statements with protocol.Debug calls
- Debug logging is now environment-gated via KAFKA_DEBUG env var
- Cleaner, structured logging approach for Kafka protocol operations
- All protocol packages build successfully
- Replace hardcoded partition ranges with centralized kafka.CreateSMQPartition
- Use proper SMQ ranges instead of single partition numbers in storage paths
- All offset mapping tests now pass with consistent 32-slot ranges
- Remove unused pub_balancer import
- Added missing ConfigSource field (int8) for versions 1-3
- Added ConfigSynonyms array (empty) for versions 1-3
- Added version-specific field handling:
* IsDefault only for version 0
* ConfigSource and ConfigSynonyms for versions 1-3
* ConfigType and ConfigDocumentation for version 3
- Fixed Sarama admin client compatibility issue with 'insufficient data to decode packet'
- AdminClientCompatibility test now passes
This resolves the final compatibility issue where Sarama admin client couldn't decode
DescribeConfigs responses due to missing protocol fields for version 2.
- Fixed parsing of DescribeConfigs requests where config names length is -1 (null array)
- Added proper validation to handle Kafka protocol null array convention
- DescribeConfigs handler now processes requests correctly and generates responses
- Issue remains with client receiving 'insufficient data to decode packet' - likely response format issue
The DescribeConfigs API is now functional but there may be a response format compatibility issue
with the Sarama client that needs further investigation.