You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

249 lines
6.5 KiB

package schema
import (
"encoding/binary"
"fmt"
)
// Format represents the schema format type
type Format int
const (
FormatUnknown Format = iota
FormatAvro
FormatProtobuf
FormatJSONSchema
)
func (f Format) String() string {
switch f {
case FormatAvro:
return "AVRO"
case FormatProtobuf:
return "PROTOBUF"
case FormatJSONSchema:
return "JSON_SCHEMA"
default:
return "UNKNOWN"
}
}
// ConfluentEnvelope represents the parsed Confluent Schema Registry envelope
type ConfluentEnvelope struct {
Format Format
SchemaID uint32
Indexes []int // For Protobuf nested message resolution
Payload []byte // The actual encoded data
OriginalBytes []byte // The complete original envelope bytes
}
// ParseConfluentEnvelope parses a Confluent Schema Registry framed message
// Returns the envelope details and whether the message was successfully parsed
func ParseConfluentEnvelope(data []byte) (*ConfluentEnvelope, bool) {
if len(data) < 5 {
return nil, false // Too short to contain magic byte + schema ID
}
// Check for Confluent magic byte (0x00)
if data[0] != 0x00 {
return nil, false // Not a Confluent-framed message
}
// Extract schema ID (big-endian uint32)
schemaID := binary.BigEndian.Uint32(data[1:5])
envelope := &ConfluentEnvelope{
Format: FormatAvro, // Default assumption; will be refined by schema registry lookup
SchemaID: schemaID,
Indexes: nil,
Payload: data[5:], // Default: payload starts after schema ID
OriginalBytes: data, // Store the complete original envelope
}
// Note: Format detection should be done by the schema registry lookup
// For now, we'll default to Avro and let the manager determine the actual format
// based on the schema registry information
return envelope, true
}
// ParseConfluentProtobufEnvelope parses a Confluent Protobuf envelope with indexes
// This is a specialized version for Protobuf that handles message indexes
//
// Note: This function uses heuristics to distinguish between index varints and
// payload data, which may not be 100% reliable in all cases. For production use,
// consider using ParseConfluentProtobufEnvelopeWithIndexCount if you know the
// expected number of indexes.
func ParseConfluentProtobufEnvelope(data []byte) (*ConfluentEnvelope, bool) {
// For now, assume no indexes to avoid parsing issues
// This can be enhanced later when we have better schema information
return ParseConfluentProtobufEnvelopeWithIndexCount(data, 0)
}
// ParseConfluentProtobufEnvelopeWithIndexCount parses a Confluent Protobuf envelope
// when you know the expected number of indexes
func ParseConfluentProtobufEnvelopeWithIndexCount(data []byte, expectedIndexCount int) (*ConfluentEnvelope, bool) {
if len(data) < 5 {
return nil, false
}
// Check for Confluent magic byte
if data[0] != 0x00 {
return nil, false
}
// Extract schema ID (big-endian uint32)
schemaID := binary.BigEndian.Uint32(data[1:5])
envelope := &ConfluentEnvelope{
Format: FormatProtobuf,
SchemaID: schemaID,
Indexes: nil,
Payload: data[5:], // Default: payload starts after schema ID
OriginalBytes: data,
}
// Parse the expected number of indexes
offset := 5
for i := 0; i < expectedIndexCount && offset < len(data); i++ {
index, bytesRead := readVarint(data[offset:])
if bytesRead == 0 {
// Invalid varint, stop parsing
break
}
envelope.Indexes = append(envelope.Indexes, int(index))
offset += bytesRead
}
envelope.Payload = data[offset:]
return envelope, true
}
// IsSchematized checks if the given bytes represent a Confluent-framed message
func IsSchematized(data []byte) bool {
_, ok := ParseConfluentEnvelope(data)
return ok
}
// ExtractSchemaID extracts just the schema ID without full parsing (for quick checks)
func ExtractSchemaID(data []byte) (uint32, bool) {
if len(data) < 5 || data[0] != 0x00 {
return 0, false
}
return binary.BigEndian.Uint32(data[1:5]), true
}
// CreateConfluentEnvelope creates a Confluent-framed message from components
// This will be useful for reconstructing messages on the Fetch path
func CreateConfluentEnvelope(format Format, schemaID uint32, indexes []int, payload []byte) []byte {
// Start with magic byte + schema ID (5 bytes minimum)
result := make([]byte, 5, 5+len(payload)+len(indexes)*4)
result[0] = 0x00 // Magic byte
binary.BigEndian.PutUint32(result[1:5], schemaID)
// For Protobuf, add indexes as varints
if format == FormatProtobuf && len(indexes) > 0 {
for _, index := range indexes {
varintBytes := encodeVarint(uint64(index))
result = append(result, varintBytes...)
}
}
// Append the actual payload
result = append(result, payload...)
return result
}
// ValidateEnvelope performs basic validation on a parsed envelope
func (e *ConfluentEnvelope) Validate() error {
if e.SchemaID == 0 {
return fmt.Errorf("invalid schema ID: 0")
}
if len(e.Payload) == 0 {
return fmt.Errorf("empty payload")
}
// Format-specific validation
switch e.Format {
case FormatAvro:
// Avro payloads should be valid binary data
// More specific validation will be done by the Avro decoder
case FormatProtobuf:
// Protobuf validation will be implemented in Phase 5
case FormatJSONSchema:
// JSON Schema validation will be implemented in Phase 6
default:
return fmt.Errorf("unsupported format: %v", e.Format)
}
return nil
}
// Metadata returns a map of envelope metadata for storage
func (e *ConfluentEnvelope) Metadata() map[string]string {
metadata := map[string]string{
"schema_format": e.Format.String(),
"schema_id": fmt.Sprintf("%d", e.SchemaID),
}
if len(e.Indexes) > 0 {
// Store indexes for Protobuf reconstruction
indexStr := ""
for i, idx := range e.Indexes {
if i > 0 {
indexStr += ","
}
indexStr += fmt.Sprintf("%d", idx)
}
metadata["protobuf_indexes"] = indexStr
}
return metadata
}
// encodeVarint encodes a uint64 as a varint
func encodeVarint(value uint64) []byte {
if value == 0 {
return []byte{0}
}
var result []byte
for value > 0 {
b := byte(value & 0x7F)
value >>= 7
if value > 0 {
b |= 0x80 // Set continuation bit
}
result = append(result, b)
}
return result
}
// readVarint reads a varint from the byte slice and returns the value and bytes consumed
func readVarint(data []byte) (uint64, int) {
var result uint64
var shift uint
for i, b := range data {
if i >= 10 { // Prevent overflow (max varint is 10 bytes)
return 0, 0
}
result |= uint64(b&0x7F) << shift
if b&0x80 == 0 {
// Last byte (MSB is 0)
return result, i + 1
}
shift += 7
}
// Incomplete varint
return 0, 0
}