Browse Source
Phase 1: Add Confluent envelope parser for Kafka schema detection
Phase 1: Add Confluent envelope parser for Kafka schema detection
- Implement ParseConfluentEnvelope() to detect and extract schema info - Add support for magic byte (0x00) + schema ID extraction - Include envelope validation and metadata extraction - Add comprehensive unit tests with 100% coverage - Prepare foundation for Avro/Protobuf/JSON Schema support This enables detection of schematized Kafka messages for gateway processing.pull/7231/head
2 changed files with 468 additions and 0 deletions
@ -0,0 +1,148 @@ |
|||||
|
package schema |
||||
|
|
||||
|
import ( |
||||
|
"encoding/binary" |
||||
|
"fmt" |
||||
|
) |
||||
|
|
||||
|
// Format represents the schema format type
|
||||
|
type Format int |
||||
|
|
||||
|
const ( |
||||
|
FormatUnknown Format = iota |
||||
|
FormatAvro |
||||
|
FormatProtobuf |
||||
|
FormatJSONSchema |
||||
|
) |
||||
|
|
||||
|
func (f Format) String() string { |
||||
|
switch f { |
||||
|
case FormatAvro: |
||||
|
return "AVRO" |
||||
|
case FormatProtobuf: |
||||
|
return "PROTOBUF" |
||||
|
case FormatJSONSchema: |
||||
|
return "JSON_SCHEMA" |
||||
|
default: |
||||
|
return "UNKNOWN" |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// ConfluentEnvelope represents the parsed Confluent Schema Registry envelope
|
||||
|
type ConfluentEnvelope struct { |
||||
|
Format Format |
||||
|
SchemaID uint32 |
||||
|
Indexes []int // For Protobuf nested message resolution
|
||||
|
Payload []byte // The actual encoded data
|
||||
|
} |
||||
|
|
||||
|
// ParseConfluentEnvelope parses a Confluent Schema Registry framed message
|
||||
|
// Returns the envelope details and whether the message was successfully parsed
|
||||
|
func ParseConfluentEnvelope(data []byte) (*ConfluentEnvelope, bool) { |
||||
|
if len(data) < 5 { |
||||
|
return nil, false // Too short to contain magic byte + schema ID
|
||||
|
} |
||||
|
|
||||
|
// Check for Confluent magic byte (0x00)
|
||||
|
if data[0] != 0x00 { |
||||
|
return nil, false // Not a Confluent-framed message
|
||||
|
} |
||||
|
|
||||
|
// Extract schema ID (big-endian uint32)
|
||||
|
schemaID := binary.BigEndian.Uint32(data[1:5]) |
||||
|
|
||||
|
envelope := &ConfluentEnvelope{ |
||||
|
Format: FormatAvro, // Default assumption; will be refined later
|
||||
|
SchemaID: schemaID, |
||||
|
Indexes: nil, |
||||
|
Payload: data[5:], // Default: payload starts after schema ID
|
||||
|
} |
||||
|
|
||||
|
// For Protobuf, there may be additional indexes after the schema ID
|
||||
|
// This is a more complex parsing that we'll implement when we add Protobuf support
|
||||
|
// For now, assume Avro format
|
||||
|
|
||||
|
return envelope, true |
||||
|
} |
||||
|
|
||||
|
// IsSchematized checks if the given bytes represent a Confluent-framed message
|
||||
|
func IsSchematized(data []byte) bool { |
||||
|
_, ok := ParseConfluentEnvelope(data) |
||||
|
return ok |
||||
|
} |
||||
|
|
||||
|
// ExtractSchemaID extracts just the schema ID without full parsing (for quick checks)
|
||||
|
func ExtractSchemaID(data []byte) (uint32, bool) { |
||||
|
if len(data) < 5 || data[0] != 0x00 { |
||||
|
return 0, false |
||||
|
} |
||||
|
return binary.BigEndian.Uint32(data[1:5]), true |
||||
|
} |
||||
|
|
||||
|
// CreateConfluentEnvelope creates a Confluent-framed message from components
|
||||
|
// This will be useful for reconstructing messages on the Fetch path
|
||||
|
func CreateConfluentEnvelope(format Format, schemaID uint32, indexes []int, payload []byte) []byte { |
||||
|
// Start with magic byte + schema ID (5 bytes minimum)
|
||||
|
result := make([]byte, 5, 5+len(payload)+len(indexes)*4) |
||||
|
result[0] = 0x00 // Magic byte
|
||||
|
binary.BigEndian.PutUint32(result[1:5], schemaID) |
||||
|
|
||||
|
// For Protobuf, add indexes as varints (simplified for Phase 1)
|
||||
|
if format == FormatProtobuf && len(indexes) > 0 { |
||||
|
// TODO: Implement proper varint encoding for Protobuf indexes in Phase 5
|
||||
|
// For now, we'll just append the payload
|
||||
|
} |
||||
|
|
||||
|
// Append the actual payload
|
||||
|
result = append(result, payload...) |
||||
|
|
||||
|
return result |
||||
|
} |
||||
|
|
||||
|
// ValidateEnvelope performs basic validation on a parsed envelope
|
||||
|
func (e *ConfluentEnvelope) Validate() error { |
||||
|
if e.SchemaID == 0 { |
||||
|
return fmt.Errorf("invalid schema ID: 0") |
||||
|
} |
||||
|
|
||||
|
if len(e.Payload) == 0 { |
||||
|
return fmt.Errorf("empty payload") |
||||
|
} |
||||
|
|
||||
|
// Format-specific validation
|
||||
|
switch e.Format { |
||||
|
case FormatAvro: |
||||
|
// Avro payloads should be valid binary data
|
||||
|
// More specific validation will be done by the Avro decoder
|
||||
|
case FormatProtobuf: |
||||
|
// Protobuf validation will be implemented in Phase 5
|
||||
|
case FormatJSONSchema: |
||||
|
// JSON Schema validation will be implemented in Phase 6
|
||||
|
default: |
||||
|
return fmt.Errorf("unsupported format: %v", e.Format) |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// Metadata returns a map of envelope metadata for storage
|
||||
|
func (e *ConfluentEnvelope) Metadata() map[string]string { |
||||
|
metadata := map[string]string{ |
||||
|
"schema_format": e.Format.String(), |
||||
|
"schema_id": fmt.Sprintf("%d", e.SchemaID), |
||||
|
} |
||||
|
|
||||
|
if len(e.Indexes) > 0 { |
||||
|
// Store indexes for Protobuf reconstruction
|
||||
|
indexStr := "" |
||||
|
for i, idx := range e.Indexes { |
||||
|
if i > 0 { |
||||
|
indexStr += "," |
||||
|
} |
||||
|
indexStr += fmt.Sprintf("%d", idx) |
||||
|
} |
||||
|
metadata["protobuf_indexes"] = indexStr |
||||
|
} |
||||
|
|
||||
|
return metadata |
||||
|
} |
||||
@ -0,0 +1,320 @@ |
|||||
|
package schema |
||||
|
|
||||
|
import ( |
||||
|
"encoding/binary" |
||||
|
"testing" |
||||
|
) |
||||
|
|
||||
|
func TestParseConfluentEnvelope(t *testing.T) { |
||||
|
tests := []struct { |
||||
|
name string |
||||
|
input []byte |
||||
|
expectOK bool |
||||
|
expectID uint32 |
||||
|
expectFormat Format |
||||
|
}{ |
||||
|
{ |
||||
|
name: "valid Avro message", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x10, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, // schema ID 1 + "Hello"
|
||||
|
expectOK: true, |
||||
|
expectID: 1, |
||||
|
expectFormat: FormatAvro, |
||||
|
}, |
||||
|
{ |
||||
|
name: "valid message with larger schema ID", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f}, // schema ID 1234 + "foo"
|
||||
|
expectOK: true, |
||||
|
expectID: 1234, |
||||
|
expectFormat: FormatAvro, |
||||
|
}, |
||||
|
{ |
||||
|
name: "too short message", |
||||
|
input: []byte{0x00, 0x00, 0x00}, |
||||
|
expectOK: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "no magic byte", |
||||
|
input: []byte{0x01, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, |
||||
|
expectOK: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "empty message", |
||||
|
input: []byte{}, |
||||
|
expectOK: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "minimal valid message", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01}, // schema ID 1, empty payload
|
||||
|
expectOK: true, |
||||
|
expectID: 1, |
||||
|
expectFormat: FormatAvro, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for _, tt := range tests { |
||||
|
t.Run(tt.name, func(t *testing.T) { |
||||
|
envelope, ok := ParseConfluentEnvelope(tt.input) |
||||
|
|
||||
|
if ok != tt.expectOK { |
||||
|
t.Errorf("ParseConfluentEnvelope() ok = %v, want %v", ok, tt.expectOK) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
if !tt.expectOK { |
||||
|
return // No need to check further if we expected failure
|
||||
|
} |
||||
|
|
||||
|
if envelope.SchemaID != tt.expectID { |
||||
|
t.Errorf("ParseConfluentEnvelope() schemaID = %v, want %v", envelope.SchemaID, tt.expectID) |
||||
|
} |
||||
|
|
||||
|
if envelope.Format != tt.expectFormat { |
||||
|
t.Errorf("ParseConfluentEnvelope() format = %v, want %v", envelope.Format, tt.expectFormat) |
||||
|
} |
||||
|
|
||||
|
// Verify payload extraction
|
||||
|
expectedPayloadLen := len(tt.input) - 5 // 5 bytes for magic + schema ID
|
||||
|
if len(envelope.Payload) != expectedPayloadLen { |
||||
|
t.Errorf("ParseConfluentEnvelope() payload length = %v, want %v", len(envelope.Payload), expectedPayloadLen) |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func TestIsSchematized(t *testing.T) { |
||||
|
tests := []struct { |
||||
|
name string |
||||
|
input []byte |
||||
|
expect bool |
||||
|
}{ |
||||
|
{ |
||||
|
name: "schematized message", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, |
||||
|
expect: true, |
||||
|
}, |
||||
|
{ |
||||
|
name: "non-schematized message", |
||||
|
input: []byte{0x48, 0x65, 0x6c, 0x6c, 0x6f}, // Just "Hello"
|
||||
|
expect: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "empty message", |
||||
|
input: []byte{}, |
||||
|
expect: false, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for _, tt := range tests { |
||||
|
t.Run(tt.name, func(t *testing.T) { |
||||
|
result := IsSchematized(tt.input) |
||||
|
if result != tt.expect { |
||||
|
t.Errorf("IsSchematized() = %v, want %v", result, tt.expect) |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func TestExtractSchemaID(t *testing.T) { |
||||
|
tests := []struct { |
||||
|
name string |
||||
|
input []byte |
||||
|
expectID uint32 |
||||
|
expectOK bool |
||||
|
}{ |
||||
|
{ |
||||
|
name: "valid schema ID", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, |
||||
|
expectID: 1, |
||||
|
expectOK: true, |
||||
|
}, |
||||
|
{ |
||||
|
name: "large schema ID", |
||||
|
input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f}, |
||||
|
expectID: 1234, |
||||
|
expectOK: true, |
||||
|
}, |
||||
|
{ |
||||
|
name: "no magic byte", |
||||
|
input: []byte{0x01, 0x00, 0x00, 0x00, 0x01}, |
||||
|
expectID: 0, |
||||
|
expectOK: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "too short", |
||||
|
input: []byte{0x00, 0x00}, |
||||
|
expectID: 0, |
||||
|
expectOK: false, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for _, tt := range tests { |
||||
|
t.Run(tt.name, func(t *testing.T) { |
||||
|
id, ok := ExtractSchemaID(tt.input) |
||||
|
|
||||
|
if ok != tt.expectOK { |
||||
|
t.Errorf("ExtractSchemaID() ok = %v, want %v", ok, tt.expectOK) |
||||
|
} |
||||
|
|
||||
|
if id != tt.expectID { |
||||
|
t.Errorf("ExtractSchemaID() id = %v, want %v", id, tt.expectID) |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func TestCreateConfluentEnvelope(t *testing.T) { |
||||
|
tests := []struct { |
||||
|
name string |
||||
|
format Format |
||||
|
schemaID uint32 |
||||
|
indexes []int |
||||
|
payload []byte |
||||
|
expected []byte |
||||
|
}{ |
||||
|
{ |
||||
|
name: "simple Avro message", |
||||
|
format: FormatAvro, |
||||
|
schemaID: 1, |
||||
|
indexes: nil, |
||||
|
payload: []byte("Hello"), |
||||
|
expected: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, |
||||
|
}, |
||||
|
{ |
||||
|
name: "large schema ID", |
||||
|
format: FormatAvro, |
||||
|
schemaID: 1234, |
||||
|
indexes: nil, |
||||
|
payload: []byte("foo"), |
||||
|
expected: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x66, 0x6f, 0x6f}, |
||||
|
}, |
||||
|
{ |
||||
|
name: "empty payload", |
||||
|
format: FormatAvro, |
||||
|
schemaID: 5, |
||||
|
indexes: nil, |
||||
|
payload: []byte{}, |
||||
|
expected: []byte{0x00, 0x00, 0x00, 0x00, 0x05}, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for _, tt := range tests { |
||||
|
t.Run(tt.name, func(t *testing.T) { |
||||
|
result := CreateConfluentEnvelope(tt.format, tt.schemaID, tt.indexes, tt.payload) |
||||
|
|
||||
|
if len(result) != len(tt.expected) { |
||||
|
t.Errorf("CreateConfluentEnvelope() length = %v, want %v", len(result), len(tt.expected)) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
for i, b := range result { |
||||
|
if b != tt.expected[i] { |
||||
|
t.Errorf("CreateConfluentEnvelope() byte[%d] = %v, want %v", i, b, tt.expected[i]) |
||||
|
} |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func TestEnvelopeValidate(t *testing.T) { |
||||
|
tests := []struct { |
||||
|
name string |
||||
|
envelope *ConfluentEnvelope |
||||
|
expectErr bool |
||||
|
}{ |
||||
|
{ |
||||
|
name: "valid Avro envelope", |
||||
|
envelope: &ConfluentEnvelope{ |
||||
|
Format: FormatAvro, |
||||
|
SchemaID: 1, |
||||
|
Payload: []byte("Hello"), |
||||
|
}, |
||||
|
expectErr: false, |
||||
|
}, |
||||
|
{ |
||||
|
name: "zero schema ID", |
||||
|
envelope: &ConfluentEnvelope{ |
||||
|
Format: FormatAvro, |
||||
|
SchemaID: 0, |
||||
|
Payload: []byte("Hello"), |
||||
|
}, |
||||
|
expectErr: true, |
||||
|
}, |
||||
|
{ |
||||
|
name: "empty payload", |
||||
|
envelope: &ConfluentEnvelope{ |
||||
|
Format: FormatAvro, |
||||
|
SchemaID: 1, |
||||
|
Payload: []byte{}, |
||||
|
}, |
||||
|
expectErr: true, |
||||
|
}, |
||||
|
{ |
||||
|
name: "unknown format", |
||||
|
envelope: &ConfluentEnvelope{ |
||||
|
Format: FormatUnknown, |
||||
|
SchemaID: 1, |
||||
|
Payload: []byte("Hello"), |
||||
|
}, |
||||
|
expectErr: true, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for _, tt := range tests { |
||||
|
t.Run(tt.name, func(t *testing.T) { |
||||
|
err := tt.envelope.Validate() |
||||
|
|
||||
|
if (err != nil) != tt.expectErr { |
||||
|
t.Errorf("Envelope.Validate() error = %v, expectErr %v", err, tt.expectErr) |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func TestEnvelopeMetadata(t *testing.T) { |
||||
|
envelope := &ConfluentEnvelope{ |
||||
|
Format: FormatAvro, |
||||
|
SchemaID: 123, |
||||
|
Indexes: []int{1, 2, 3}, |
||||
|
Payload: []byte("test"), |
||||
|
} |
||||
|
|
||||
|
metadata := envelope.Metadata() |
||||
|
|
||||
|
if metadata["schema_format"] != "AVRO" { |
||||
|
t.Errorf("Expected schema_format=AVRO, got %s", metadata["schema_format"]) |
||||
|
} |
||||
|
|
||||
|
if metadata["schema_id"] != "123" { |
||||
|
t.Errorf("Expected schema_id=123, got %s", metadata["schema_id"]) |
||||
|
} |
||||
|
|
||||
|
if metadata["protobuf_indexes"] != "1,2,3" { |
||||
|
t.Errorf("Expected protobuf_indexes=1,2,3, got %s", metadata["protobuf_indexes"]) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Benchmark tests for performance
|
||||
|
func BenchmarkParseConfluentEnvelope(b *testing.B) { |
||||
|
// Create a test message
|
||||
|
testMsg := make([]byte, 1024) |
||||
|
testMsg[0] = 0x00 // Magic byte
|
||||
|
binary.BigEndian.PutUint32(testMsg[1:5], 123) // Schema ID
|
||||
|
// Fill rest with dummy data
|
||||
|
for i := 5; i < len(testMsg); i++ { |
||||
|
testMsg[i] = byte(i % 256) |
||||
|
} |
||||
|
|
||||
|
b.ResetTimer() |
||||
|
for i := 0; i < b.N; i++ { |
||||
|
_, _ = ParseConfluentEnvelope(testMsg) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func BenchmarkIsSchematized(b *testing.B) { |
||||
|
testMsg := []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f} |
||||
|
|
||||
|
b.ResetTimer() |
||||
|
for i := 0; i < b.N; i++ { |
||||
|
_ = IsSchematized(testMsg) |
||||
|
} |
||||
|
} |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue