You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
344 lines
8.5 KiB
344 lines
8.5 KiB
package schema
|
|
|
|
import (
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestManager_SchemaEvolution tests schema evolution integration in the manager
|
|
func TestManager_SchemaEvolution(t *testing.T) {
|
|
// Create a manager without registry (for testing evolution logic only)
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Compatible Avro evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
assert.Empty(t, result.Issues)
|
|
})
|
|
|
|
t.Run("Incompatible Avro evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.NotEmpty(t, result.Issues)
|
|
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
|
|
})
|
|
|
|
t.Run("Schema evolution suggestions", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
suggestions, err := manager.SuggestSchemaEvolution(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.NotEmpty(t, suggestions)
|
|
|
|
// Should suggest adding default values
|
|
found := false
|
|
for _, suggestion := range suggestions {
|
|
if strings.Contains(suggestion, "default") {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
assert.True(t, found, "Should suggest adding default values, got: %v", suggestions)
|
|
})
|
|
|
|
t.Run("JSON Schema evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "integer"},
|
|
"name": {"type": "string"}
|
|
},
|
|
"required": ["id", "name"]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "integer"},
|
|
"name": {"type": "string"},
|
|
"email": {"type": "string"}
|
|
},
|
|
"required": ["id", "name"]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatJSONSchema, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Full compatibility check", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityFull)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Type promotion compatibility", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "score", "type": "int"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "score", "type": "long"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
}
|
|
|
|
// TestManager_CompatibilityLevels tests compatibility level management
|
|
func TestManager_CompatibilityLevels(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Get default compatibility level", func(t *testing.T) {
|
|
level := manager.GetCompatibilityLevel("test-subject")
|
|
assert.Equal(t, CompatibilityBackward, level)
|
|
})
|
|
|
|
t.Run("Set compatibility level", func(t *testing.T) {
|
|
err := manager.SetCompatibilityLevel("test-subject", CompatibilityFull)
|
|
assert.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// TestManager_CanEvolveSchema tests the CanEvolveSchema method
|
|
func TestManager_CanEvolveSchema(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Compatible evolution", func(t *testing.T) {
|
|
currentSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Incompatible evolution", func(t *testing.T) {
|
|
currentSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
|
|
})
|
|
}
|
|
|
|
// TestManager_SchemaEvolutionWorkflow tests a complete schema evolution workflow
|
|
func TestManager_SchemaEvolutionWorkflow(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Complete evolution workflow", func(t *testing.T) {
|
|
// Step 1: Define initial schema
|
|
initialSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"},
|
|
{"name": "action", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
// Step 2: Propose schema evolution (compatible)
|
|
evolvedSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"},
|
|
{"name": "action", "type": "string"},
|
|
{"name": "timestamp", "type": "long", "default": 0}
|
|
]
|
|
}`
|
|
|
|
// Check compatibility explicitly
|
|
result, err := manager.CanEvolveSchema("user-events", initialSchema, evolvedSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
|
|
// Step 3: Try incompatible evolution
|
|
incompatibleSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"}
|
|
]
|
|
}`
|
|
|
|
result, err = manager.CanEvolveSchema("user-events", initialSchema, incompatibleSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.Contains(t, result.Issues[0], "Field 'action' was removed")
|
|
|
|
// Step 4: Get suggestions for incompatible evolution
|
|
suggestions, err := manager.SuggestSchemaEvolution(initialSchema, incompatibleSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.NotEmpty(t, suggestions)
|
|
})
|
|
}
|
|
|
|
// BenchmarkSchemaEvolution benchmarks schema evolution operations
|
|
func BenchmarkSchemaEvolution(b *testing.B) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""},
|
|
{"name": "age", "type": "int", "default": 0}
|
|
]
|
|
}`
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
}
|