You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

344 lines
8.5 KiB

package schema
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestManager_SchemaEvolution tests schema evolution integration in the manager
func TestManager_SchemaEvolution(t *testing.T) {
// Create a manager without registry (for testing evolution logic only)
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Compatible Avro evolution", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
assert.Empty(t, result.Issues)
})
t.Run("Incompatible Avro evolution", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.NotEmpty(t, result.Issues)
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
})
t.Run("Schema evolution suggestions", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
suggestions, err := manager.SuggestSchemaEvolution(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.NotEmpty(t, suggestions)
// Should suggest adding default values
found := false
for _, suggestion := range suggestions {
if strings.Contains(suggestion, "default") {
found = true
break
}
}
assert.True(t, found, "Should suggest adding default values, got: %v", suggestions)
})
t.Run("JSON Schema evolution", func(t *testing.T) {
oldSchema := `{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"}
},
"required": ["id", "name"]
}`
newSchema := `{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["id", "name"]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatJSONSchema, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Full compatibility check", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityFull)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Type promotion compatibility", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "score", "type": "int"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "score", "type": "long"}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
}
// TestManager_CompatibilityLevels tests compatibility level management
func TestManager_CompatibilityLevels(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Get default compatibility level", func(t *testing.T) {
level := manager.GetCompatibilityLevel("test-subject")
assert.Equal(t, CompatibilityBackward, level)
})
t.Run("Set compatibility level", func(t *testing.T) {
err := manager.SetCompatibilityLevel("test-subject", CompatibilityFull)
assert.NoError(t, err)
})
}
// TestManager_CanEvolveSchema tests the CanEvolveSchema method
func TestManager_CanEvolveSchema(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Compatible evolution", func(t *testing.T) {
currentSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Incompatible evolution", func(t *testing.T) {
currentSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
})
}
// TestManager_SchemaEvolutionWorkflow tests a complete schema evolution workflow
func TestManager_SchemaEvolutionWorkflow(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Complete evolution workflow", func(t *testing.T) {
// Step 1: Define initial schema
initialSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"},
{"name": "action", "type": "string"}
]
}`
// Step 2: Propose schema evolution (compatible)
evolvedSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"},
{"name": "action", "type": "string"},
{"name": "timestamp", "type": "long", "default": 0}
]
}`
// Check compatibility explicitly
result, err := manager.CanEvolveSchema("user-events", initialSchema, evolvedSchema, FormatAvro)
require.NoError(t, err)
assert.True(t, result.Compatible)
// Step 3: Try incompatible evolution
incompatibleSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"}
]
}`
result, err = manager.CanEvolveSchema("user-events", initialSchema, incompatibleSchema, FormatAvro)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.Contains(t, result.Issues[0], "Field 'action' was removed")
// Step 4: Get suggestions for incompatible evolution
suggestions, err := manager.SuggestSchemaEvolution(initialSchema, incompatibleSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.NotEmpty(t, suggestions)
})
}
// BenchmarkSchemaEvolution benchmarks schema evolution operations
func BenchmarkSchemaEvolution(b *testing.B) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""},
{"name": "age", "type": "int", "default": 0}
]
}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
if err != nil {
b.Fatal(err)
}
}
}