You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

522 lines
17 KiB

package schema
import (
"encoding/json"
"fmt"
"strings"
"github.com/linkedin/goavro/v2"
)
// CompatibilityLevel defines the schema compatibility level
type CompatibilityLevel string
const (
CompatibilityNone CompatibilityLevel = "NONE"
CompatibilityBackward CompatibilityLevel = "BACKWARD"
CompatibilityForward CompatibilityLevel = "FORWARD"
CompatibilityFull CompatibilityLevel = "FULL"
)
// SchemaEvolutionChecker handles schema compatibility checking and evolution
type SchemaEvolutionChecker struct {
// Cache for parsed schemas to avoid re-parsing
schemaCache map[string]interface{}
}
// NewSchemaEvolutionChecker creates a new schema evolution checker
func NewSchemaEvolutionChecker() *SchemaEvolutionChecker {
return &SchemaEvolutionChecker{
schemaCache: make(map[string]interface{}),
}
}
// CompatibilityResult represents the result of a compatibility check
type CompatibilityResult struct {
Compatible bool
Issues []string
Level CompatibilityLevel
}
// CheckCompatibility checks if two schemas are compatible according to the specified level
func (checker *SchemaEvolutionChecker) CheckCompatibility(
oldSchemaStr, newSchemaStr string,
format Format,
level CompatibilityLevel,
) (*CompatibilityResult, error) {
result := &CompatibilityResult{
Compatible: true,
Issues: []string{},
Level: level,
}
if level == CompatibilityNone {
return result, nil
}
switch format {
case FormatAvro:
return checker.checkAvroCompatibility(oldSchemaStr, newSchemaStr, level)
case FormatProtobuf:
return checker.checkProtobufCompatibility(oldSchemaStr, newSchemaStr, level)
case FormatJSONSchema:
return checker.checkJSONSchemaCompatibility(oldSchemaStr, newSchemaStr, level)
default:
return nil, fmt.Errorf("unsupported schema format for compatibility check: %s", format)
}
}
// checkAvroCompatibility checks Avro schema compatibility
func (checker *SchemaEvolutionChecker) checkAvroCompatibility(
oldSchemaStr, newSchemaStr string,
level CompatibilityLevel,
) (*CompatibilityResult, error) {
result := &CompatibilityResult{
Compatible: true,
Issues: []string{},
Level: level,
}
// Parse old schema
oldSchema, err := goavro.NewCodec(oldSchemaStr)
if err != nil {
return nil, fmt.Errorf("failed to parse old Avro schema: %w", err)
}
// Parse new schema
newSchema, err := goavro.NewCodec(newSchemaStr)
if err != nil {
return nil, fmt.Errorf("failed to parse new Avro schema: %w", err)
}
// Parse schema structures for detailed analysis
var oldSchemaMap, newSchemaMap map[string]interface{}
if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchemaMap); err != nil {
return nil, fmt.Errorf("failed to parse old schema JSON: %w", err)
}
if err := json.Unmarshal([]byte(newSchemaStr), &newSchemaMap); err != nil {
return nil, fmt.Errorf("failed to parse new schema JSON: %w", err)
}
// Check compatibility based on level
switch level {
case CompatibilityBackward:
checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result)
case CompatibilityForward:
checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result)
case CompatibilityFull:
checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result)
if result.Compatible {
checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result)
}
}
// Additional validation: try to create test data and check if it can be read
if result.Compatible {
if err := checker.validateAvroDataCompatibility(oldSchema, newSchema, level); err != nil {
result.Compatible = false
result.Issues = append(result.Issues, fmt.Sprintf("Data compatibility test failed: %v", err))
}
}
return result, nil
}
// checkAvroBackwardCompatibility checks if new schema can read data written with old schema
func (checker *SchemaEvolutionChecker) checkAvroBackwardCompatibility(
oldSchema, newSchema map[string]interface{},
result *CompatibilityResult,
) {
// Check if fields were removed without defaults
oldFields := checker.extractAvroFields(oldSchema)
newFields := checker.extractAvroFields(newSchema)
for fieldName, oldField := range oldFields {
if newField, exists := newFields[fieldName]; !exists {
// Field was removed - this breaks backward compatibility
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Field '%s' was removed, breaking backward compatibility", fieldName))
} else {
// Field exists, check type compatibility
if !checker.areAvroTypesCompatible(oldField["type"], newField["type"], true) {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Field '%s' type changed incompatibly", fieldName))
}
}
}
// Check if new required fields were added without defaults
for fieldName, newField := range newFields {
if _, exists := oldFields[fieldName]; !exists {
// New field added
if _, hasDefault := newField["default"]; !hasDefault {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("New required field '%s' added without default value", fieldName))
}
}
}
}
// checkAvroForwardCompatibility checks if old schema can read data written with new schema
func (checker *SchemaEvolutionChecker) checkAvroForwardCompatibility(
oldSchema, newSchema map[string]interface{},
result *CompatibilityResult,
) {
// Check if fields were added without defaults in old schema
oldFields := checker.extractAvroFields(oldSchema)
newFields := checker.extractAvroFields(newSchema)
for fieldName, newField := range newFields {
if _, exists := oldFields[fieldName]; !exists {
// New field added - for forward compatibility, the new field should have a default
// so that old schema can ignore it when reading data written with new schema
if _, hasDefault := newField["default"]; !hasDefault {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("New field '%s' cannot be read by old schema (no default)", fieldName))
}
} else {
// Field exists, check type compatibility (reverse direction)
oldField := oldFields[fieldName]
if !checker.areAvroTypesCompatible(newField["type"], oldField["type"], false) {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Field '%s' type change breaks forward compatibility", fieldName))
}
}
}
// Check if fields were removed
for fieldName := range oldFields {
if _, exists := newFields[fieldName]; !exists {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Field '%s' was removed, breaking forward compatibility", fieldName))
}
}
}
// extractAvroFields extracts field information from an Avro schema
func (checker *SchemaEvolutionChecker) extractAvroFields(schema map[string]interface{}) map[string]map[string]interface{} {
fields := make(map[string]map[string]interface{})
if fieldsArray, ok := schema["fields"].([]interface{}); ok {
for _, fieldInterface := range fieldsArray {
if field, ok := fieldInterface.(map[string]interface{}); ok {
if name, ok := field["name"].(string); ok {
fields[name] = field
}
}
}
}
return fields
}
// areAvroTypesCompatible checks if two Avro types are compatible
func (checker *SchemaEvolutionChecker) areAvroTypesCompatible(oldType, newType interface{}, backward bool) bool {
// Simplified type compatibility check
// In a full implementation, this would handle complex types, unions, etc.
oldTypeStr := fmt.Sprintf("%v", oldType)
newTypeStr := fmt.Sprintf("%v", newType)
// Same type is always compatible
if oldTypeStr == newTypeStr {
return true
}
// Check for promotable types (e.g., int -> long, float -> double)
if backward {
return checker.isPromotableType(oldTypeStr, newTypeStr)
} else {
return checker.isPromotableType(newTypeStr, oldTypeStr)
}
}
// isPromotableType checks if a type can be promoted to another
func (checker *SchemaEvolutionChecker) isPromotableType(from, to string) bool {
promotions := map[string][]string{
"int": {"long", "float", "double"},
"long": {"float", "double"},
"float": {"double"},
"string": {"bytes"},
"bytes": {"string"},
}
if validPromotions, exists := promotions[from]; exists {
for _, validTo := range validPromotions {
if to == validTo {
return true
}
}
}
return false
}
// validateAvroDataCompatibility validates compatibility by testing with actual data
func (checker *SchemaEvolutionChecker) validateAvroDataCompatibility(
oldSchema, newSchema *goavro.Codec,
level CompatibilityLevel,
) error {
// Create test data with old schema
testData := map[string]interface{}{
"test_field": "test_value",
}
// Try to encode with old schema
encoded, err := oldSchema.BinaryFromNative(nil, testData)
if err != nil {
// If we can't create test data, skip validation
return nil
}
// Try to decode with new schema (backward compatibility)
if level == CompatibilityBackward || level == CompatibilityFull {
_, _, err := newSchema.NativeFromBinary(encoded)
if err != nil {
return fmt.Errorf("backward compatibility failed: %w", err)
}
}
// Try to encode with new schema and decode with old (forward compatibility)
if level == CompatibilityForward || level == CompatibilityFull {
newEncoded, err := newSchema.BinaryFromNative(nil, testData)
if err == nil {
_, _, err = oldSchema.NativeFromBinary(newEncoded)
if err != nil {
return fmt.Errorf("forward compatibility failed: %w", err)
}
}
}
return nil
}
// checkProtobufCompatibility checks Protobuf schema compatibility
func (checker *SchemaEvolutionChecker) checkProtobufCompatibility(
oldSchemaStr, newSchemaStr string,
level CompatibilityLevel,
) (*CompatibilityResult, error) {
result := &CompatibilityResult{
Compatible: true,
Issues: []string{},
Level: level,
}
// For now, implement basic Protobuf compatibility rules
// In a full implementation, this would parse .proto files and check field numbers, types, etc.
// Basic check: if schemas are identical, they're compatible
if oldSchemaStr == newSchemaStr {
return result, nil
}
// For protobuf, we need to parse the schema and check:
// - Field numbers haven't changed
// - Required fields haven't been removed
// - Field types are compatible
// Simplified implementation - mark as compatible with warning
result.Issues = append(result.Issues, "Protobuf compatibility checking is simplified - manual review recommended")
return result, nil
}
// checkJSONSchemaCompatibility checks JSON Schema compatibility
func (checker *SchemaEvolutionChecker) checkJSONSchemaCompatibility(
oldSchemaStr, newSchemaStr string,
level CompatibilityLevel,
) (*CompatibilityResult, error) {
result := &CompatibilityResult{
Compatible: true,
Issues: []string{},
Level: level,
}
// Parse JSON schemas
var oldSchema, newSchema map[string]interface{}
if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchema); err != nil {
return nil, fmt.Errorf("failed to parse old JSON schema: %w", err)
}
if err := json.Unmarshal([]byte(newSchemaStr), &newSchema); err != nil {
return nil, fmt.Errorf("failed to parse new JSON schema: %w", err)
}
// Check compatibility based on level
switch level {
case CompatibilityBackward:
checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result)
case CompatibilityForward:
checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result)
case CompatibilityFull:
checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result)
if result.Compatible {
checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result)
}
}
return result, nil
}
// checkJSONSchemaBackwardCompatibility checks JSON Schema backward compatibility
func (checker *SchemaEvolutionChecker) checkJSONSchemaBackwardCompatibility(
oldSchema, newSchema map[string]interface{},
result *CompatibilityResult,
) {
// Check if required fields were added
oldRequired := checker.extractJSONSchemaRequired(oldSchema)
newRequired := checker.extractJSONSchemaRequired(newSchema)
for _, field := range newRequired {
if !contains(oldRequired, field) {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("New required field '%s' breaks backward compatibility", field))
}
}
// Check if properties were removed
oldProperties := checker.extractJSONSchemaProperties(oldSchema)
newProperties := checker.extractJSONSchemaProperties(newSchema)
for propName := range oldProperties {
if _, exists := newProperties[propName]; !exists {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Property '%s' was removed, breaking backward compatibility", propName))
}
}
}
// checkJSONSchemaForwardCompatibility checks JSON Schema forward compatibility
func (checker *SchemaEvolutionChecker) checkJSONSchemaForwardCompatibility(
oldSchema, newSchema map[string]interface{},
result *CompatibilityResult,
) {
// Check if required fields were removed
oldRequired := checker.extractJSONSchemaRequired(oldSchema)
newRequired := checker.extractJSONSchemaRequired(newSchema)
for _, field := range oldRequired {
if !contains(newRequired, field) {
result.Compatible = false
result.Issues = append(result.Issues,
fmt.Sprintf("Required field '%s' was removed, breaking forward compatibility", field))
}
}
// Check if properties were added
oldProperties := checker.extractJSONSchemaProperties(oldSchema)
newProperties := checker.extractJSONSchemaProperties(newSchema)
for propName := range newProperties {
if _, exists := oldProperties[propName]; !exists {
result.Issues = append(result.Issues,
fmt.Sprintf("New property '%s' added - ensure old schema can handle it", propName))
}
}
}
// extractJSONSchemaRequired extracts required fields from JSON Schema
func (checker *SchemaEvolutionChecker) extractJSONSchemaRequired(schema map[string]interface{}) []string {
if required, ok := schema["required"].([]interface{}); ok {
var fields []string
for _, field := range required {
if fieldStr, ok := field.(string); ok {
fields = append(fields, fieldStr)
}
}
return fields
}
return []string{}
}
// extractJSONSchemaProperties extracts properties from JSON Schema
func (checker *SchemaEvolutionChecker) extractJSONSchemaProperties(schema map[string]interface{}) map[string]interface{} {
if properties, ok := schema["properties"].(map[string]interface{}); ok {
return properties
}
return make(map[string]interface{})
}
// contains checks if a slice contains a string
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
// GetCompatibilityLevel returns the compatibility level for a subject
func (checker *SchemaEvolutionChecker) GetCompatibilityLevel(subject string) CompatibilityLevel {
// In a real implementation, this would query the schema registry
// For now, return a default level
return CompatibilityBackward
}
// SetCompatibilityLevel sets the compatibility level for a subject
func (checker *SchemaEvolutionChecker) SetCompatibilityLevel(subject string, level CompatibilityLevel) error {
// In a real implementation, this would update the schema registry
return nil
}
// CanEvolve checks if a schema can be evolved according to the compatibility rules
func (checker *SchemaEvolutionChecker) CanEvolve(
subject string,
currentSchemaStr, newSchemaStr string,
format Format,
) (*CompatibilityResult, error) {
level := checker.GetCompatibilityLevel(subject)
return checker.CheckCompatibility(currentSchemaStr, newSchemaStr, format, level)
}
// SuggestEvolution suggests how to evolve a schema to maintain compatibility
func (checker *SchemaEvolutionChecker) SuggestEvolution(
oldSchemaStr, newSchemaStr string,
format Format,
level CompatibilityLevel,
) ([]string, error) {
suggestions := []string{}
result, err := checker.CheckCompatibility(oldSchemaStr, newSchemaStr, format, level)
if err != nil {
return nil, err
}
if result.Compatible {
suggestions = append(suggestions, "Schema evolution is compatible")
return suggestions, nil
}
// Analyze issues and provide suggestions
for _, issue := range result.Issues {
if strings.Contains(issue, "required field") && strings.Contains(issue, "added") {
suggestions = append(suggestions, "Add default values to new required fields")
}
if strings.Contains(issue, "removed") {
suggestions = append(suggestions, "Consider deprecating fields instead of removing them")
}
if strings.Contains(issue, "type changed") {
suggestions = append(suggestions, "Use type promotion or union types for type changes")
}
}
if len(suggestions) == 0 {
suggestions = append(suggestions, "Manual schema review required - compatibility issues detected")
}
return suggestions, nil
}