CRITICAL FIX: Remove incorrect RecordValue validation for all Kafka messages
The previous implementation was incorrectly trying to validate ALL Kafka messages
as RecordValue protobuf format, causing error logs and potential issues.
Key Changes:
- Remove error logging for non-RecordValue messages in broker_grpc_pub.go
- Only validate RecordValue structure for messages that are actually RecordValue
- Allow regular Kafka messages and offset management messages to be stored as raw bytes
- Maintain validation for actual schema-based messages sent via ProduceRecordValue
Impact:
✅ Comprehensive E2E tests now pass completely (all 4 scenarios)
✅ No more 'proto: cannot parse invalid wire-format data' error spam
✅ Regular Kafka messages work perfectly with both kafka-go and Sarama
✅ Schema-based messages still get proper RecordValue validation when needed
This fix ensures that:
1. Regular Kafka messages are stored as raw bytes (correct behavior)
2. Schema-based messages are stored as RecordValue (when schema management enabled)
3. Offset management messages work without validation interference
4. No false error logging for normal operation
The Kafka gateway now works correctly for both regular and schema-based messages
MAJOR IMPROVEMENT: Store schema ID in topic config, not on every message
The previous approach of storing schema metadata on every RecordValue message
was inefficient and not aligned with how schema registries work. This commit
implements the correct approach:
Key Changes:
- Remove per-message schema metadata (schema_id, schema_format fields)
- Add TopicSchemaConfig struct to store schema info per topic
- Add topicSchemaConfigs cache in Handler with thread-safe access
- Store schema config when first schematized message is processed
- Retrieve schema config from topic when re-encoding messages
- Update method signatures to pass topic name through the call chain
Benefits:
1. Much more efficient - no redundant schema metadata on every message
2. Aligns with Kafka Schema Registry patterns - topics have schemas
3. Reduces message size and storage overhead
4. Cleaner separation of concerns - schema config vs message data
5. Better performance for high-throughput topics
Architecture:
- Produce: topic + schematized message → decode → store RecordValue + cache schema config
- Fetch: topic + RecordValue → get schema config → re-encode to Confluent format
- Schema config cached in memory with plans for persistent storage
This is the correct way to handle schemas in a Kafka-compatible system.
BREAKING CHANGE: Remove fixed 'key', 'value', 'timestamp' structure
The previous implementation incorrectly used a fixed RecordValue structure with
hardcoded fields ('key', 'value', 'timestamp'). This was wrong because:
1. RecordValue should reflect the actual schema from the schema registry
2. Different messages have different schemas with different field names
3. The structure should be determined by the registered schema, not hardcoded
Correct Implementation:
- Produce Path: Only process Confluent-framed messages with schema registry
- Decode schematized messages to get actual RecordValue from schema
- Store the schema-based RecordValue (not fixed structure) in SeaweedMQ
- Fetch Path: Re-encode RecordValue back to Confluent format using schema
- Fallback to JSON representation when schema encoding fails
- Raw messages (non-schematized) bypass RecordValue processing entirely
Key Changes:
- produceSchemaBasedRecord: Only processes schematized messages, falls back to raw for others
- Remove createRecordValueFromKafkaMessage: No more fixed structure creation
- Update SMQ validation: Check for non-empty fields, not specific field names
- Update tests: Remove assumptions about fixed field structure
- Add TODO for proper RecordValue to Confluent format encoding
This ensures the Kafka gateway correctly uses schemas from the schema registry
instead of imposing an artificial fixed structure.
- Add schema_message_test.go with tests for RecordValue creation and decoding
- Test round-trip message integrity (Kafka -> RecordValue -> Kafka)
- Test schematized message handling with nested RecordValue structures
- Test backward compatibility with raw bytes (non-RecordValue messages)
- Add broker_recordvalue_test.go for SMQ broker RecordValue validation
- Test required field validation for Kafka topics (key, value, timestamp)
- Test different topic namespaces and validation rules
- Add schema_recordvalue_test.go for integration testing
- Test complete message flow with real SeaweedMQ backend
- Test different message types (JSON, text, binary, Unicode)
- Add ProduceRecordValue method to all test handlers
- Add public methods to Handler for testing access
- Add decodeRecordValueToKafkaMessage method to convert RecordValue back to Kafka messages
- Extract original Kafka message bytes from RecordValue structure
- Support both raw bytes and schematized messages in RecordValue
- Re-encode schematized messages back to Confluent format when schema registry is available
- Add fallback JSON conversion for RecordValue when schema encoding fails
- Maintain backward compatibility with non-RecordValue messages
- Handle different value types (bytes, strings, nested RecordValue) in RecordValue fields
- Add produceSchemaBasedRecord method to encode Kafka messages as RecordValue
- Create RecordValue structure with key, value, timestamp fields
- Support schema registry integration for Confluent-framed messages
- Add ProduceRecordValue method to SeaweedMQHandler interface and implementations
- Update both BrokerClient and AgentClient to handle RecordValue publishing
- Properly decode schematized messages and embed them in RecordValue structure
- Add schema metadata (schema_id, schema_format) for schematized messages
- Update validation logic to expect RecordValue format for Kafka topics
- Add validateRecordValue method to check required fields (key, value, timestamp)
- Improve error messages to distinguish between validation failures and format issues
- This prepares SMQ broker to receive properly encoded messages from Kafka gateway