Branch:
feature/mq-kafka-gateway-m1
add-ec-vacuum
add-foundation-db
add_fasthttp_client
add_remote_storage
adding-message-queue-integration-tests
avoid_releasing_temp_file_on_write
changing-to-zap
collect-public-metrics
create-table-snapshot-api-design
data_query_pushdown
dependabot/maven/other/java/client/com.google.protobuf-protobuf-java-3.25.5
dependabot/maven/other/java/examples/org.apache.hadoop-hadoop-common-3.4.0
detect-and-plan-ec-tasks
do-not-retry-if-error-is-NotFound
fasthttp
feature/mq-kafka-gateway-m1
filer1_maintenance_branch
fix-GetObjectLockConfigurationHandler
fix-versioning-listing-only
ftp
gh-pages
improve-fuse-mount
improve-fuse-mount2
logrus
master
message_send
mount2
mq-subscribe
mq2
original_weed_mount
random_access_file
refactor-needle-read-operations
refactor-volume-write
remote_overlay
revert-5134-patch-1
revert-5819-patch-1
revert-6434-bugfix-missing-s3-audit
s3-select
sub
tcp_read
test-reverting-lock-table
test_udp
testing
testing-sdx-generation
tikv
track-mount-e2e
volume_buffered_writes
worker-execute-ec-tasks
0.72
0.72.release
0.73
0.74
0.75
0.76
0.77
0.90
0.91
0.92
0.93
0.94
0.95
0.96
0.97
0.98
0.99
1.00
1.01
1.02
1.03
1.04
1.05
1.06
1.07
1.08
1.09
1.10
1.11
1.12
1.14
1.15
1.16
1.17
1.18
1.19
1.20
1.21
1.22
1.23
1.24
1.25
1.26
1.27
1.28
1.29
1.30
1.31
1.32
1.33
1.34
1.35
1.36
1.37
1.38
1.40
1.41
1.42
1.43
1.44
1.45
1.46
1.47
1.48
1.49
1.50
1.51
1.52
1.53
1.54
1.55
1.56
1.57
1.58
1.59
1.60
1.61
1.61RC
1.62
1.63
1.64
1.65
1.66
1.67
1.68
1.69
1.70
1.71
1.72
1.73
1.74
1.75
1.76
1.77
1.78
1.79
1.80
1.81
1.82
1.83
1.84
1.85
1.86
1.87
1.88
1.90
1.91
1.92
1.93
1.94
1.95
1.96
1.97
1.98
1.99
1;70
2.00
2.01
2.02
2.03
2.04
2.05
2.06
2.07
2.08
2.09
2.10
2.11
2.12
2.13
2.14
2.15
2.16
2.17
2.18
2.19
2.20
2.21
2.22
2.23
2.24
2.25
2.26
2.27
2.28
2.29
2.30
2.31
2.32
2.33
2.34
2.35
2.36
2.37
2.38
2.39
2.40
2.41
2.42
2.43
2.47
2.48
2.49
2.50
2.51
2.52
2.53
2.54
2.55
2.56
2.57
2.58
2.59
2.60
2.61
2.62
2.63
2.64
2.65
2.66
2.67
2.68
2.69
2.70
2.71
2.72
2.73
2.74
2.75
2.76
2.77
2.78
2.79
2.80
2.81
2.82
2.83
2.84
2.85
2.86
2.87
2.88
2.89
2.90
2.91
2.92
2.93
2.94
2.95
2.96
2.97
2.98
2.99
3.00
3.01
3.02
3.03
3.04
3.05
3.06
3.07
3.08
3.09
3.10
3.11
3.12
3.13
3.14
3.15
3.16
3.18
3.19
3.20
3.21
3.22
3.23
3.24
3.25
3.26
3.27
3.28
3.29
3.30
3.31
3.32
3.33
3.34
3.35
3.36
3.37
3.38
3.39
3.40
3.41
3.42
3.43
3.44
3.45
3.46
3.47
3.48
3.50
3.51
3.52
3.53
3.54
3.55
3.56
3.57
3.58
3.59
3.60
3.61
3.62
3.63
3.64
3.65
3.66
3.67
3.68
3.69
3.71
3.72
3.73
3.74
3.75
3.76
3.77
3.78
3.79
3.80
3.81
3.82
3.83
3.84
3.85
3.86
3.87
3.88
3.89
3.90
3.91
3.92
3.93
3.94
3.95
3.96
3.97
dev
helm-3.65.1
v0.69
v0.70beta
v3.33
${ item.name }
${ noResults }
5 Commits (feature/mq-kafka-gateway-m1)
Author | SHA1 | Message | Date |
---|---|---|---|
|
c4c0ae20b9 |
Implement topic-based schema configuration instead of per-message metadata
MAJOR IMPROVEMENT: Store schema ID in topic config, not on every message The previous approach of storing schema metadata on every RecordValue message was inefficient and not aligned with how schema registries work. This commit implements the correct approach: Key Changes: - Remove per-message schema metadata (schema_id, schema_format fields) - Add TopicSchemaConfig struct to store schema info per topic - Add topicSchemaConfigs cache in Handler with thread-safe access - Store schema config when first schematized message is processed - Retrieve schema config from topic when re-encoding messages - Update method signatures to pass topic name through the call chain Benefits: 1. Much more efficient - no redundant schema metadata on every message 2. Aligns with Kafka Schema Registry patterns - topics have schemas 3. Reduces message size and storage overhead 4. Cleaner separation of concerns - schema config vs message data 5. Better performance for high-throughput topics Architecture: - Produce: topic + schematized message → decode → store RecordValue + cache schema config - Fetch: topic + RecordValue → get schema config → re-encode to Confluent format - Schema config cached in memory with plans for persistent storage This is the correct way to handle schemas in a Kafka-compatible system. |
17 hours ago |
|
9f0dfa2969 |
fmt
|
17 hours ago |
|
faabe7ca6a |
Fix schema-based message handling to use actual schemas from schema registry
BREAKING CHANGE: Remove fixed 'key', 'value', 'timestamp' structure The previous implementation incorrectly used a fixed RecordValue structure with hardcoded fields ('key', 'value', 'timestamp'). This was wrong because: 1. RecordValue should reflect the actual schema from the schema registry 2. Different messages have different schemas with different field names 3. The structure should be determined by the registered schema, not hardcoded Correct Implementation: - Produce Path: Only process Confluent-framed messages with schema registry - Decode schematized messages to get actual RecordValue from schema - Store the schema-based RecordValue (not fixed structure) in SeaweedMQ - Fetch Path: Re-encode RecordValue back to Confluent format using schema - Fallback to JSON representation when schema encoding fails - Raw messages (non-schematized) bypass RecordValue processing entirely Key Changes: - produceSchemaBasedRecord: Only processes schematized messages, falls back to raw for others - Remove createRecordValueFromKafkaMessage: No more fixed structure creation - Update SMQ validation: Check for non-empty fields, not specific field names - Update tests: Remove assumptions about fixed field structure - Add TODO for proper RecordValue to Confluent format encoding This ensures the Kafka gateway correctly uses schemas from the schema registry instead of imposing an artificial fixed structure. |
17 hours ago |
|
7dc6c7f2c8 |
fmt
|
18 hours ago |
|
f2181ec874 |
Add comprehensive test coverage for schema-based message handling
- Add schema_message_test.go with tests for RecordValue creation and decoding - Test round-trip message integrity (Kafka -> RecordValue -> Kafka) - Test schematized message handling with nested RecordValue structures - Test backward compatibility with raw bytes (non-RecordValue messages) - Add broker_recordvalue_test.go for SMQ broker RecordValue validation - Test required field validation for Kafka topics (key, value, timestamp) - Test different topic namespaces and validation rules - Add schema_recordvalue_test.go for integration testing - Test complete message flow with real SeaweedMQ backend - Test different message types (JSON, text, binary, Unicode) - Add ProduceRecordValue method to all test handlers - Add public methods to Handler for testing access |
18 hours ago |