You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
756 lines
24 KiB
756 lines
24 KiB
package schema
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/reflect/protoreflect"
|
|
"google.golang.org/protobuf/types/dynamicpb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// Manager coordinates schema operations for the Kafka Gateway
|
|
type Manager struct {
|
|
registryClient *RegistryClient
|
|
|
|
// Decoder cache
|
|
avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder
|
|
protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder
|
|
jsonSchemaDecoders map[uint32]*JSONSchemaDecoder // schema ID -> decoder
|
|
decoderMu sync.RWMutex
|
|
|
|
// Schema evolution checker
|
|
evolutionChecker *SchemaEvolutionChecker
|
|
|
|
// Configuration
|
|
config ManagerConfig
|
|
}
|
|
|
|
// ManagerConfig holds configuration for the schema manager
|
|
type ManagerConfig struct {
|
|
RegistryURL string
|
|
RegistryUsername string
|
|
RegistryPassword string
|
|
CacheTTL string
|
|
ValidationMode ValidationMode
|
|
EnableMirroring bool
|
|
MirrorPath string // Path in SeaweedFS Filer to mirror schemas
|
|
}
|
|
|
|
// ValidationMode defines how strict schema validation should be
|
|
type ValidationMode int
|
|
|
|
const (
|
|
ValidationPermissive ValidationMode = iota // Allow unknown fields, best-effort decoding
|
|
ValidationStrict // Reject messages that don't match schema exactly
|
|
)
|
|
|
|
// DecodedMessage represents a decoded Kafka message with schema information
|
|
type DecodedMessage struct {
|
|
// Original envelope information
|
|
Envelope *ConfluentEnvelope
|
|
|
|
// Schema information
|
|
SchemaID uint32
|
|
SchemaFormat Format
|
|
Subject string
|
|
Version int
|
|
|
|
// Decoded data
|
|
RecordValue *schema_pb.RecordValue
|
|
RecordType *schema_pb.RecordType
|
|
|
|
// Metadata for storage
|
|
Metadata map[string]string
|
|
}
|
|
|
|
// NewManager creates a new schema manager
|
|
func NewManager(config ManagerConfig) (*Manager, error) {
|
|
registryConfig := RegistryConfig{
|
|
URL: config.RegistryURL,
|
|
Username: config.RegistryUsername,
|
|
Password: config.RegistryPassword,
|
|
}
|
|
|
|
registryClient := NewRegistryClient(registryConfig)
|
|
|
|
return &Manager{
|
|
registryClient: registryClient,
|
|
avroDecoders: make(map[uint32]*AvroDecoder),
|
|
protobufDecoders: make(map[uint32]*ProtobufDecoder),
|
|
jsonSchemaDecoders: make(map[uint32]*JSONSchemaDecoder),
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// NewManagerWithHealthCheck creates a new schema manager and validates connectivity
|
|
func NewManagerWithHealthCheck(config ManagerConfig) (*Manager, error) {
|
|
manager, err := NewManager(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Test connectivity
|
|
if err := manager.registryClient.HealthCheck(); err != nil {
|
|
return nil, fmt.Errorf("schema registry health check failed: %w", err)
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
// DecodeMessage decodes a Kafka message if it contains schema information
|
|
func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) {
|
|
// Step 1: Check if message is schematized
|
|
envelope, isSchematized := ParseConfluentEnvelope(messageBytes)
|
|
if !isSchematized {
|
|
return nil, fmt.Errorf("message is not schematized")
|
|
}
|
|
|
|
// Step 2: Validate envelope
|
|
if err := envelope.Validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid envelope: %w", err)
|
|
}
|
|
|
|
// Step 3: Get schema from registry
|
|
cachedSchema, err := m.registryClient.GetSchemaByID(envelope.SchemaID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get schema %d: %w", envelope.SchemaID, err)
|
|
}
|
|
|
|
// Step 4: Decode based on format
|
|
var recordValue *schema_pb.RecordValue
|
|
var recordType *schema_pb.RecordType
|
|
|
|
switch cachedSchema.Format {
|
|
case FormatAvro:
|
|
recordValue, recordType, err = m.decodeAvroMessage(envelope, cachedSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode Avro message: %w", err)
|
|
}
|
|
case FormatProtobuf:
|
|
recordValue, recordType, err = m.decodeProtobufMessage(envelope, cachedSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode Protobuf message: %w", err)
|
|
}
|
|
case FormatJSONSchema:
|
|
recordValue, recordType, err = m.decodeJSONSchemaMessage(envelope, cachedSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode JSON Schema message: %w", err)
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format)
|
|
}
|
|
|
|
// Step 5: Create decoded message
|
|
decodedMsg := &DecodedMessage{
|
|
Envelope: envelope,
|
|
SchemaID: envelope.SchemaID,
|
|
SchemaFormat: cachedSchema.Format,
|
|
Subject: cachedSchema.Subject,
|
|
Version: cachedSchema.Version,
|
|
RecordValue: recordValue,
|
|
RecordType: recordType,
|
|
Metadata: m.createMetadata(envelope, cachedSchema),
|
|
}
|
|
|
|
return decodedMsg, nil
|
|
}
|
|
|
|
// decodeAvroMessage decodes an Avro message using cached or new decoder
|
|
func (m *Manager) decodeAvroMessage(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) (*schema_pb.RecordValue, *schema_pb.RecordType, error) {
|
|
// Get or create Avro decoder
|
|
decoder, err := m.getAvroDecoder(envelope.SchemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to get Avro decoder: %w", err)
|
|
}
|
|
|
|
// Decode to RecordValue
|
|
recordValue, err := decoder.DecodeToRecordValue(envelope.Payload)
|
|
if err != nil {
|
|
if m.config.ValidationMode == ValidationStrict {
|
|
return nil, nil, fmt.Errorf("strict validation failed: %w", err)
|
|
}
|
|
// In permissive mode, try to decode as much as possible
|
|
// For now, return the error - we could implement partial decoding later
|
|
return nil, nil, fmt.Errorf("permissive decoding failed: %w", err)
|
|
}
|
|
|
|
// Infer or get RecordType
|
|
recordType, err := decoder.InferRecordType()
|
|
if err != nil {
|
|
// Fall back to inferring from the decoded map
|
|
if decodedMap, decodeErr := decoder.Decode(envelope.Payload); decodeErr == nil {
|
|
recordType = InferRecordTypeFromMap(decodedMap)
|
|
} else {
|
|
return nil, nil, fmt.Errorf("failed to infer record type: %w", err)
|
|
}
|
|
}
|
|
|
|
return recordValue, recordType, nil
|
|
}
|
|
|
|
// decodeProtobufMessage decodes a Protobuf message using cached or new decoder
|
|
func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) (*schema_pb.RecordValue, *schema_pb.RecordType, error) {
|
|
// Get or create Protobuf decoder
|
|
decoder, err := m.getProtobufDecoder(envelope.SchemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to get Protobuf decoder: %w", err)
|
|
}
|
|
|
|
// Decode to RecordValue
|
|
recordValue, err := decoder.DecodeToRecordValue(envelope.Payload)
|
|
if err != nil {
|
|
if m.config.ValidationMode == ValidationStrict {
|
|
return nil, nil, fmt.Errorf("strict validation failed: %w", err)
|
|
}
|
|
// In permissive mode, try to decode as much as possible
|
|
return nil, nil, fmt.Errorf("permissive decoding failed: %w", err)
|
|
}
|
|
|
|
// Get RecordType from descriptor
|
|
recordType, err := decoder.InferRecordType()
|
|
if err != nil {
|
|
// Fall back to inferring from the decoded map
|
|
if decodedMap, decodeErr := decoder.Decode(envelope.Payload); decodeErr == nil {
|
|
recordType = InferRecordTypeFromMap(decodedMap)
|
|
} else {
|
|
return nil, nil, fmt.Errorf("failed to infer record type: %w", err)
|
|
}
|
|
}
|
|
|
|
return recordValue, recordType, nil
|
|
}
|
|
|
|
// decodeJSONSchemaMessage decodes a JSON Schema message using cached or new decoder
|
|
func (m *Manager) decodeJSONSchemaMessage(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) (*schema_pb.RecordValue, *schema_pb.RecordType, error) {
|
|
// Get or create JSON Schema decoder
|
|
decoder, err := m.getJSONSchemaDecoder(envelope.SchemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to get JSON Schema decoder: %w", err)
|
|
}
|
|
|
|
// Decode to RecordValue
|
|
recordValue, err := decoder.DecodeToRecordValue(envelope.Payload)
|
|
if err != nil {
|
|
if m.config.ValidationMode == ValidationStrict {
|
|
return nil, nil, fmt.Errorf("strict validation failed: %w", err)
|
|
}
|
|
// In permissive mode, try to decode as much as possible
|
|
return nil, nil, fmt.Errorf("permissive decoding failed: %w", err)
|
|
}
|
|
|
|
// Get RecordType from schema
|
|
recordType, err := decoder.InferRecordType()
|
|
if err != nil {
|
|
// Fall back to inferring from the decoded map
|
|
if decodedMap, decodeErr := decoder.Decode(envelope.Payload); decodeErr == nil {
|
|
recordType = InferRecordTypeFromMap(decodedMap)
|
|
} else {
|
|
return nil, nil, fmt.Errorf("failed to infer record type: %w", err)
|
|
}
|
|
}
|
|
|
|
return recordValue, recordType, nil
|
|
}
|
|
|
|
// getAvroDecoder gets or creates an Avro decoder for the given schema
|
|
func (m *Manager) getAvroDecoder(schemaID uint32, schemaStr string) (*AvroDecoder, error) {
|
|
// Check cache first
|
|
m.decoderMu.RLock()
|
|
if decoder, exists := m.avroDecoders[schemaID]; exists {
|
|
m.decoderMu.RUnlock()
|
|
return decoder, nil
|
|
}
|
|
m.decoderMu.RUnlock()
|
|
|
|
// Create new decoder
|
|
decoder, err := NewAvroDecoder(schemaStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache the decoder
|
|
m.decoderMu.Lock()
|
|
m.avroDecoders[schemaID] = decoder
|
|
m.decoderMu.Unlock()
|
|
|
|
return decoder, nil
|
|
}
|
|
|
|
// getProtobufDecoder gets or creates a Protobuf decoder for the given schema
|
|
func (m *Manager) getProtobufDecoder(schemaID uint32, schemaStr string) (*ProtobufDecoder, error) {
|
|
// Check cache first
|
|
m.decoderMu.RLock()
|
|
if decoder, exists := m.protobufDecoders[schemaID]; exists {
|
|
m.decoderMu.RUnlock()
|
|
return decoder, nil
|
|
}
|
|
m.decoderMu.RUnlock()
|
|
|
|
// For Protobuf, the schema is typically a binary FileDescriptorSet
|
|
// In Confluent Schema Registry, Protobuf schemas are stored as binary descriptors
|
|
schemaBytes := []byte(schemaStr) // Assume schemaStr contains binary data
|
|
|
|
// Create new decoder
|
|
decoder, err := NewProtobufDecoder(schemaBytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache the decoder
|
|
m.decoderMu.Lock()
|
|
m.protobufDecoders[schemaID] = decoder
|
|
m.decoderMu.Unlock()
|
|
|
|
return decoder, nil
|
|
}
|
|
|
|
// getJSONSchemaDecoder gets or creates a JSON Schema decoder for the given schema
|
|
func (m *Manager) getJSONSchemaDecoder(schemaID uint32, schemaStr string) (*JSONSchemaDecoder, error) {
|
|
// Check cache first
|
|
m.decoderMu.RLock()
|
|
if decoder, exists := m.jsonSchemaDecoders[schemaID]; exists {
|
|
m.decoderMu.RUnlock()
|
|
return decoder, nil
|
|
}
|
|
m.decoderMu.RUnlock()
|
|
|
|
// Create new decoder
|
|
decoder, err := NewJSONSchemaDecoder(schemaStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache the decoder
|
|
m.decoderMu.Lock()
|
|
m.jsonSchemaDecoders[schemaID] = decoder
|
|
m.decoderMu.Unlock()
|
|
|
|
return decoder, nil
|
|
}
|
|
|
|
// createMetadata creates metadata for storage in SeaweedMQ
|
|
func (m *Manager) createMetadata(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) map[string]string {
|
|
metadata := envelope.Metadata()
|
|
|
|
// Add schema registry information
|
|
metadata["schema_subject"] = cachedSchema.Subject
|
|
metadata["schema_version"] = fmt.Sprintf("%d", cachedSchema.Version)
|
|
metadata["registry_url"] = m.registryClient.baseURL
|
|
|
|
// Add decoding information
|
|
metadata["decoded_at"] = fmt.Sprintf("%d", cachedSchema.CachedAt.Unix())
|
|
metadata["validation_mode"] = fmt.Sprintf("%d", m.config.ValidationMode)
|
|
|
|
return metadata
|
|
}
|
|
|
|
// IsSchematized checks if a message contains schema information
|
|
func (m *Manager) IsSchematized(messageBytes []byte) bool {
|
|
return IsSchematized(messageBytes)
|
|
}
|
|
|
|
// GetSchemaInfo extracts basic schema information without full decoding
|
|
func (m *Manager) GetSchemaInfo(messageBytes []byte) (uint32, Format, error) {
|
|
envelope, ok := ParseConfluentEnvelope(messageBytes)
|
|
if !ok {
|
|
return 0, FormatUnknown, fmt.Errorf("not a schematized message")
|
|
}
|
|
|
|
// Get basic schema info from cache or registry
|
|
cachedSchema, err := m.registryClient.GetSchemaByID(envelope.SchemaID)
|
|
if err != nil {
|
|
return 0, FormatUnknown, fmt.Errorf("failed to get schema info: %w", err)
|
|
}
|
|
|
|
return envelope.SchemaID, cachedSchema.Format, nil
|
|
}
|
|
|
|
// RegisterSchema registers a new schema with the registry
|
|
func (m *Manager) RegisterSchema(subject, schema string) (uint32, error) {
|
|
return m.registryClient.RegisterSchema(subject, schema)
|
|
}
|
|
|
|
// CheckCompatibility checks if a schema is compatible with existing versions
|
|
func (m *Manager) CheckCompatibility(subject, schema string) (bool, error) {
|
|
return m.registryClient.CheckCompatibility(subject, schema)
|
|
}
|
|
|
|
// ListSubjects returns all subjects in the registry
|
|
func (m *Manager) ListSubjects() ([]string, error) {
|
|
return m.registryClient.ListSubjects()
|
|
}
|
|
|
|
// ClearCache clears all cached decoders and registry data
|
|
func (m *Manager) ClearCache() {
|
|
m.decoderMu.Lock()
|
|
m.avroDecoders = make(map[uint32]*AvroDecoder)
|
|
m.protobufDecoders = make(map[uint32]*ProtobufDecoder)
|
|
m.jsonSchemaDecoders = make(map[uint32]*JSONSchemaDecoder)
|
|
m.decoderMu.Unlock()
|
|
|
|
m.registryClient.ClearCache()
|
|
}
|
|
|
|
// GetCacheStats returns cache statistics
|
|
func (m *Manager) GetCacheStats() (decoders, schemas, subjects int) {
|
|
m.decoderMu.RLock()
|
|
decoders = len(m.avroDecoders) + len(m.protobufDecoders) + len(m.jsonSchemaDecoders)
|
|
m.decoderMu.RUnlock()
|
|
|
|
schemas, subjects = m.registryClient.GetCacheStats()
|
|
return
|
|
}
|
|
|
|
// EncodeMessage encodes a RecordValue back to Confluent format (for Fetch path)
|
|
func (m *Manager) EncodeMessage(recordValue *schema_pb.RecordValue, schemaID uint32, format Format) ([]byte, error) {
|
|
switch format {
|
|
case FormatAvro:
|
|
return m.encodeAvroMessage(recordValue, schemaID)
|
|
case FormatProtobuf:
|
|
return m.encodeProtobufMessage(recordValue, schemaID)
|
|
case FormatJSONSchema:
|
|
return m.encodeJSONSchemaMessage(recordValue, schemaID)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported format for encoding: %v", format)
|
|
}
|
|
}
|
|
|
|
// encodeAvroMessage encodes a RecordValue back to Avro binary format
|
|
func (m *Manager) encodeAvroMessage(recordValue *schema_pb.RecordValue, schemaID uint32) ([]byte, error) {
|
|
// Get schema from registry
|
|
cachedSchema, err := m.registryClient.GetSchemaByID(schemaID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get schema for encoding: %w", err)
|
|
}
|
|
|
|
// Get decoder (which contains the codec)
|
|
decoder, err := m.getAvroDecoder(schemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get decoder for encoding: %w", err)
|
|
}
|
|
|
|
// Convert RecordValue back to Go map with Avro union format preservation
|
|
goMap := recordValueToMapWithAvroContext(recordValue, true)
|
|
|
|
// Encode using Avro codec
|
|
binary, err := decoder.codec.BinaryFromNative(nil, goMap)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to encode to Avro binary: %w", err)
|
|
}
|
|
|
|
// Create Confluent envelope
|
|
envelope := CreateConfluentEnvelope(FormatAvro, schemaID, nil, binary)
|
|
|
|
return envelope, nil
|
|
}
|
|
|
|
// encodeProtobufMessage encodes a RecordValue back to Protobuf binary format
|
|
func (m *Manager) encodeProtobufMessage(recordValue *schema_pb.RecordValue, schemaID uint32) ([]byte, error) {
|
|
// Get schema from registry
|
|
cachedSchema, err := m.registryClient.GetSchemaByID(schemaID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get schema for encoding: %w", err)
|
|
}
|
|
|
|
// Get decoder (which contains the descriptor)
|
|
decoder, err := m.getProtobufDecoder(schemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get decoder for encoding: %w", err)
|
|
}
|
|
|
|
// Convert RecordValue back to Go map
|
|
goMap := recordValueToMap(recordValue)
|
|
|
|
// Create a new message instance and populate it
|
|
msg := decoder.msgType.New()
|
|
if err := m.populateProtobufMessage(msg, goMap, decoder.descriptor); err != nil {
|
|
return nil, fmt.Errorf("failed to populate Protobuf message: %w", err)
|
|
}
|
|
|
|
// Encode using Protobuf
|
|
binary, err := proto.Marshal(msg.Interface())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to encode to Protobuf binary: %w", err)
|
|
}
|
|
|
|
// Create Confluent envelope (with indexes if needed)
|
|
envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, nil, binary)
|
|
|
|
return envelope, nil
|
|
}
|
|
|
|
// encodeJSONSchemaMessage encodes a RecordValue back to JSON Schema format
|
|
func (m *Manager) encodeJSONSchemaMessage(recordValue *schema_pb.RecordValue, schemaID uint32) ([]byte, error) {
|
|
// Get schema from registry
|
|
cachedSchema, err := m.registryClient.GetSchemaByID(schemaID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get schema for encoding: %w", err)
|
|
}
|
|
|
|
// Get decoder (which contains the schema validator)
|
|
decoder, err := m.getJSONSchemaDecoder(schemaID, cachedSchema.Schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get decoder for encoding: %w", err)
|
|
}
|
|
|
|
// Encode using JSON Schema decoder
|
|
jsonData, err := decoder.EncodeFromRecordValue(recordValue)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
|
|
}
|
|
|
|
// Create Confluent envelope
|
|
envelope := CreateConfluentEnvelope(FormatJSONSchema, schemaID, nil, jsonData)
|
|
|
|
return envelope, nil
|
|
}
|
|
|
|
// populateProtobufMessage populates a Protobuf message from a Go map
|
|
func (m *Manager) populateProtobufMessage(msg protoreflect.Message, data map[string]interface{}, desc protoreflect.MessageDescriptor) error {
|
|
for key, value := range data {
|
|
// Find the field descriptor
|
|
fieldDesc := desc.Fields().ByName(protoreflect.Name(key))
|
|
if fieldDesc == nil {
|
|
// Skip unknown fields in permissive mode
|
|
continue
|
|
}
|
|
|
|
// Convert and set the value
|
|
protoValue, err := m.goValueToProtoValue(value, fieldDesc)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to convert field %s: %w", key, err)
|
|
}
|
|
|
|
msg.Set(fieldDesc, protoValue)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// goValueToProtoValue converts a Go value to a Protobuf Value
|
|
func (m *Manager) goValueToProtoValue(value interface{}, fieldDesc protoreflect.FieldDescriptor) (protoreflect.Value, error) {
|
|
if value == nil {
|
|
return protoreflect.Value{}, nil
|
|
}
|
|
|
|
switch fieldDesc.Kind() {
|
|
case protoreflect.BoolKind:
|
|
if b, ok := value.(bool); ok {
|
|
return protoreflect.ValueOfBool(b), nil
|
|
}
|
|
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
|
|
if i, ok := value.(int32); ok {
|
|
return protoreflect.ValueOfInt32(i), nil
|
|
}
|
|
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
|
|
if i, ok := value.(int64); ok {
|
|
return protoreflect.ValueOfInt64(i), nil
|
|
}
|
|
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
|
|
if i, ok := value.(uint32); ok {
|
|
return protoreflect.ValueOfUint32(i), nil
|
|
}
|
|
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
|
|
if i, ok := value.(uint64); ok {
|
|
return protoreflect.ValueOfUint64(i), nil
|
|
}
|
|
case protoreflect.FloatKind:
|
|
if f, ok := value.(float32); ok {
|
|
return protoreflect.ValueOfFloat32(f), nil
|
|
}
|
|
case protoreflect.DoubleKind:
|
|
if f, ok := value.(float64); ok {
|
|
return protoreflect.ValueOfFloat64(f), nil
|
|
}
|
|
case protoreflect.StringKind:
|
|
if s, ok := value.(string); ok {
|
|
return protoreflect.ValueOfString(s), nil
|
|
}
|
|
case protoreflect.BytesKind:
|
|
if b, ok := value.([]byte); ok {
|
|
return protoreflect.ValueOfBytes(b), nil
|
|
}
|
|
case protoreflect.EnumKind:
|
|
if i, ok := value.(int32); ok {
|
|
return protoreflect.ValueOfEnum(protoreflect.EnumNumber(i)), nil
|
|
}
|
|
case protoreflect.MessageKind:
|
|
if nestedMap, ok := value.(map[string]interface{}); ok {
|
|
// Handle nested messages
|
|
nestedMsg := dynamicpb.NewMessage(fieldDesc.Message())
|
|
if err := m.populateProtobufMessage(nestedMsg, nestedMap, fieldDesc.Message()); err != nil {
|
|
return protoreflect.Value{}, err
|
|
}
|
|
return protoreflect.ValueOfMessage(nestedMsg), nil
|
|
}
|
|
}
|
|
|
|
return protoreflect.Value{}, fmt.Errorf("unsupported value type %T for field kind %v", value, fieldDesc.Kind())
|
|
}
|
|
|
|
// recordValueToMap converts a RecordValue back to a Go map for encoding
|
|
func recordValueToMap(recordValue *schema_pb.RecordValue) map[string]interface{} {
|
|
return recordValueToMapWithAvroContext(recordValue, false)
|
|
}
|
|
|
|
// recordValueToMapWithAvroContext converts a RecordValue back to a Go map for encoding
|
|
// with optional Avro union format preservation
|
|
func recordValueToMapWithAvroContext(recordValue *schema_pb.RecordValue, preserveAvroUnions bool) map[string]interface{} {
|
|
result := make(map[string]interface{})
|
|
|
|
for key, value := range recordValue.Fields {
|
|
result[key] = schemaValueToGoValueWithAvroContext(value, preserveAvroUnions)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// schemaValueToGoValue converts a schema Value back to a Go value
|
|
func schemaValueToGoValue(value *schema_pb.Value) interface{} {
|
|
return schemaValueToGoValueWithAvroContext(value, false)
|
|
}
|
|
|
|
// schemaValueToGoValueWithAvroContext converts a schema Value back to a Go value
|
|
// with optional Avro union format preservation
|
|
func schemaValueToGoValueWithAvroContext(value *schema_pb.Value, preserveAvroUnions bool) interface{} {
|
|
switch v := value.Kind.(type) {
|
|
case *schema_pb.Value_BoolValue:
|
|
return v.BoolValue
|
|
case *schema_pb.Value_Int32Value:
|
|
return v.Int32Value
|
|
case *schema_pb.Value_Int64Value:
|
|
return v.Int64Value
|
|
case *schema_pb.Value_FloatValue:
|
|
return v.FloatValue
|
|
case *schema_pb.Value_DoubleValue:
|
|
return v.DoubleValue
|
|
case *schema_pb.Value_StringValue:
|
|
return v.StringValue
|
|
case *schema_pb.Value_BytesValue:
|
|
return v.BytesValue
|
|
case *schema_pb.Value_ListValue:
|
|
result := make([]interface{}, len(v.ListValue.Values))
|
|
for i, item := range v.ListValue.Values {
|
|
result[i] = schemaValueToGoValueWithAvroContext(item, preserveAvroUnions)
|
|
}
|
|
return result
|
|
case *schema_pb.Value_RecordValue:
|
|
recordMap := recordValueToMapWithAvroContext(v.RecordValue, preserveAvroUnions)
|
|
|
|
// Check if this record represents an Avro union
|
|
if preserveAvroUnions && isAvroUnionRecord(v.RecordValue) {
|
|
// Return the union map directly since it's already in the correct format
|
|
return recordMap
|
|
}
|
|
|
|
return recordMap
|
|
case *schema_pb.Value_TimestampValue:
|
|
// Convert back to time if needed, or return as int64
|
|
return v.TimestampValue.TimestampMicros
|
|
default:
|
|
// Default to string representation
|
|
return fmt.Sprintf("%v", value)
|
|
}
|
|
}
|
|
|
|
|
|
// isAvroUnionRecord checks if a RecordValue represents an Avro union
|
|
func isAvroUnionRecord(record *schema_pb.RecordValue) bool {
|
|
// A record represents an Avro union if it has exactly one field
|
|
// and the field name is an Avro type name
|
|
if len(record.Fields) != 1 {
|
|
return false
|
|
}
|
|
|
|
for key := range record.Fields {
|
|
return isAvroUnionTypeName(key)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// isAvroUnionTypeName checks if a string is a valid Avro union type name
|
|
func isAvroUnionTypeName(name string) bool {
|
|
switch name {
|
|
case "null", "boolean", "int", "long", "float", "double", "bytes", "string":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// CheckSchemaCompatibility checks if two schemas are compatible
|
|
func (m *Manager) CheckSchemaCompatibility(
|
|
oldSchemaStr, newSchemaStr string,
|
|
format Format,
|
|
level CompatibilityLevel,
|
|
) (*CompatibilityResult, error) {
|
|
return m.evolutionChecker.CheckCompatibility(oldSchemaStr, newSchemaStr, format, level)
|
|
}
|
|
|
|
// CanEvolveSchema checks if a schema can be evolved for a given subject
|
|
func (m *Manager) CanEvolveSchema(
|
|
subject string,
|
|
currentSchemaStr, newSchemaStr string,
|
|
format Format,
|
|
) (*CompatibilityResult, error) {
|
|
return m.evolutionChecker.CanEvolve(subject, currentSchemaStr, newSchemaStr, format)
|
|
}
|
|
|
|
// SuggestSchemaEvolution provides suggestions for schema evolution
|
|
func (m *Manager) SuggestSchemaEvolution(
|
|
oldSchemaStr, newSchemaStr string,
|
|
format Format,
|
|
level CompatibilityLevel,
|
|
) ([]string, error) {
|
|
return m.evolutionChecker.SuggestEvolution(oldSchemaStr, newSchemaStr, format, level)
|
|
}
|
|
|
|
// ValidateSchemaEvolution validates a schema evolution before applying it
|
|
func (m *Manager) ValidateSchemaEvolution(
|
|
subject string,
|
|
newSchemaStr string,
|
|
format Format,
|
|
) error {
|
|
// Get the current schema for the subject
|
|
currentSchema, err := m.registryClient.GetLatestSchema(subject)
|
|
if err != nil {
|
|
// If no current schema exists, any schema is valid
|
|
return nil
|
|
}
|
|
|
|
// Check compatibility
|
|
result, err := m.CanEvolveSchema(subject, currentSchema.Schema, newSchemaStr, format)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check schema compatibility: %w", err)
|
|
}
|
|
|
|
if !result.Compatible {
|
|
return fmt.Errorf("schema evolution is not compatible: %v", result.Issues)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetCompatibilityLevel gets the compatibility level for a subject
|
|
func (m *Manager) GetCompatibilityLevel(subject string) CompatibilityLevel {
|
|
return m.evolutionChecker.GetCompatibilityLevel(subject)
|
|
}
|
|
|
|
// SetCompatibilityLevel sets the compatibility level for a subject
|
|
func (m *Manager) SetCompatibilityLevel(subject string, level CompatibilityLevel) error {
|
|
return m.evolutionChecker.SetCompatibilityLevel(subject, level)
|
|
}
|
|
|
|
// GetSchemaByID retrieves a schema by its ID
|
|
func (m *Manager) GetSchemaByID(schemaID uint32) (*CachedSchema, error) {
|
|
return m.registryClient.GetSchemaByID(schemaID)
|
|
}
|
|
|
|
// GetLatestSchema retrieves the latest schema for a subject
|
|
func (m *Manager) GetLatestSchema(subject string) (*CachedSubject, error) {
|
|
return m.registryClient.GetLatestSchema(subject)
|
|
}
|