You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
350 lines
9.6 KiB
350 lines
9.6 KiB
package schema
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
|
|
"github.com/linkedin/goavro/v2"
|
|
)
|
|
|
|
func TestSchemaReconstruction_Avro(t *testing.T) {
|
|
// Create mock schema registry
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path == "/schemas/ids/1" {
|
|
response := map[string]interface{}{
|
|
"schema": `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`,
|
|
"subject": "user-value",
|
|
"version": 1,
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
} else {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
}
|
|
}))
|
|
defer server.Close()
|
|
|
|
// Create manager
|
|
config := ManagerConfig{
|
|
RegistryURL: server.URL,
|
|
ValidationMode: ValidationPermissive,
|
|
}
|
|
|
|
manager, err := NewManager(config)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create manager: %v", err)
|
|
}
|
|
|
|
// Create test Avro message
|
|
avroSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
codec, err := goavro.NewCodec(avroSchema)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create Avro codec: %v", err)
|
|
}
|
|
|
|
// Create original test data
|
|
originalRecord := map[string]interface{}{
|
|
"id": int32(123),
|
|
"name": "John Doe",
|
|
}
|
|
|
|
// Encode to Avro binary
|
|
avroBinary, err := codec.BinaryFromNative(nil, originalRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to encode Avro data: %v", err)
|
|
}
|
|
|
|
// Create original Confluent message
|
|
originalMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
|
|
|
|
// Debug: Check the created message
|
|
t.Logf("Original Avro binary length: %d", len(avroBinary))
|
|
t.Logf("Original Confluent message length: %d", len(originalMsg))
|
|
|
|
// Debug: Parse the envelope manually to see what's happening
|
|
envelope, ok := ParseConfluentEnvelope(originalMsg)
|
|
if !ok {
|
|
t.Fatal("Failed to parse Confluent envelope")
|
|
}
|
|
t.Logf("Parsed envelope - SchemaID: %d, Format: %v, Payload length: %d",
|
|
envelope.SchemaID, envelope.Format, len(envelope.Payload))
|
|
|
|
// Step 1: Decode the original message (simulate Produce path)
|
|
decodedMsg, err := manager.DecodeMessage(originalMsg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to decode message: %v", err)
|
|
}
|
|
|
|
// Step 2: Reconstruct the message (simulate Fetch path)
|
|
reconstructedMsg, err := manager.EncodeMessage(decodedMsg.RecordValue, 1, FormatAvro)
|
|
if err != nil {
|
|
t.Fatalf("Failed to reconstruct message: %v", err)
|
|
}
|
|
|
|
// Step 3: Verify the reconstructed message can be decoded again
|
|
finalDecodedMsg, err := manager.DecodeMessage(reconstructedMsg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to decode reconstructed message: %v", err)
|
|
}
|
|
|
|
// Verify data integrity through the round trip
|
|
if finalDecodedMsg.RecordValue.Fields["id"].GetInt32Value() != 123 {
|
|
t.Errorf("Expected id=123, got %v", finalDecodedMsg.RecordValue.Fields["id"].GetInt32Value())
|
|
}
|
|
|
|
if finalDecodedMsg.RecordValue.Fields["name"].GetStringValue() != "John Doe" {
|
|
t.Errorf("Expected name='John Doe', got %v", finalDecodedMsg.RecordValue.Fields["name"].GetStringValue())
|
|
}
|
|
|
|
// Verify schema information is preserved
|
|
if finalDecodedMsg.SchemaID != 1 {
|
|
t.Errorf("Expected schema ID 1, got %d", finalDecodedMsg.SchemaID)
|
|
}
|
|
|
|
if finalDecodedMsg.SchemaFormat != FormatAvro {
|
|
t.Errorf("Expected Avro format, got %v", finalDecodedMsg.SchemaFormat)
|
|
}
|
|
|
|
t.Logf("Successfully completed round-trip: Original -> Decode -> Encode -> Decode")
|
|
t.Logf("Original message size: %d bytes", len(originalMsg))
|
|
t.Logf("Reconstructed message size: %d bytes", len(reconstructedMsg))
|
|
}
|
|
|
|
func TestSchemaReconstruction_MultipleFormats(t *testing.T) {
|
|
// Test that the reconstruction framework can handle multiple schema formats
|
|
|
|
testCases := []struct {
|
|
name string
|
|
format Format
|
|
}{
|
|
{"Avro", FormatAvro},
|
|
{"Protobuf", FormatProtobuf},
|
|
{"JSON Schema", FormatJSONSchema},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Create test RecordValue
|
|
testMap := map[string]interface{}{
|
|
"id": int32(456),
|
|
"name": "Jane Smith",
|
|
}
|
|
recordValue := MapToRecordValue(testMap)
|
|
|
|
// Create mock manager (without registry for this test)
|
|
config := ManagerConfig{
|
|
RegistryURL: "http://localhost:8081", // Not used for this test
|
|
}
|
|
|
|
manager, err := NewManager(config)
|
|
if err != nil {
|
|
t.Skip("Skipping test - no registry available")
|
|
}
|
|
|
|
// Test encoding (will fail for Protobuf/JSON Schema in Phase 7, which is expected)
|
|
_, err = manager.EncodeMessage(recordValue, 1, tc.format)
|
|
|
|
switch tc.format {
|
|
case FormatAvro:
|
|
// Avro should work (but will fail due to no registry)
|
|
if err == nil {
|
|
t.Error("Expected error for Avro without registry setup")
|
|
}
|
|
case FormatProtobuf:
|
|
// Protobuf should fail gracefully
|
|
if err == nil {
|
|
t.Error("Expected error for Protobuf in Phase 7")
|
|
}
|
|
if err.Error() != "failed to get schema for encoding: schema registry health check failed with status 404" {
|
|
// This is expected - we don't have a real registry
|
|
}
|
|
case FormatJSONSchema:
|
|
// JSON Schema should fail gracefully
|
|
if err == nil {
|
|
t.Error("Expected error for JSON Schema in Phase 7")
|
|
}
|
|
expectedErr := "JSON Schema encoding not yet implemented (Phase 7)"
|
|
if err.Error() != "failed to get schema for encoding: schema registry health check failed with status 404" {
|
|
// This is also expected due to registry issues
|
|
}
|
|
_ = expectedErr // Use the variable to avoid unused warning
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestConfluentEnvelope_RoundTrip(t *testing.T) {
|
|
// Test that Confluent envelope creation and parsing work correctly
|
|
|
|
testCases := []struct {
|
|
name string
|
|
format Format
|
|
schemaID uint32
|
|
indexes []int
|
|
payload []byte
|
|
}{
|
|
{
|
|
name: "Avro message",
|
|
format: FormatAvro,
|
|
schemaID: 1,
|
|
indexes: nil,
|
|
payload: []byte("avro-payload"),
|
|
},
|
|
{
|
|
name: "Protobuf message with indexes",
|
|
format: FormatProtobuf,
|
|
schemaID: 2,
|
|
indexes: nil, // TODO: Implement proper Protobuf index handling
|
|
payload: []byte("protobuf-payload"),
|
|
},
|
|
{
|
|
name: "JSON Schema message",
|
|
format: FormatJSONSchema,
|
|
schemaID: 3,
|
|
indexes: nil,
|
|
payload: []byte("json-payload"),
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Create envelope
|
|
envelopeBytes := CreateConfluentEnvelope(tc.format, tc.schemaID, tc.indexes, tc.payload)
|
|
|
|
// Parse envelope
|
|
parsedEnvelope, ok := ParseConfluentEnvelope(envelopeBytes)
|
|
if !ok {
|
|
t.Fatal("Failed to parse created envelope")
|
|
}
|
|
|
|
// Verify schema ID
|
|
if parsedEnvelope.SchemaID != tc.schemaID {
|
|
t.Errorf("Expected schema ID %d, got %d", tc.schemaID, parsedEnvelope.SchemaID)
|
|
}
|
|
|
|
// Verify payload
|
|
if string(parsedEnvelope.Payload) != string(tc.payload) {
|
|
t.Errorf("Expected payload %s, got %s", string(tc.payload), string(parsedEnvelope.Payload))
|
|
}
|
|
|
|
// For Protobuf, verify indexes (if any)
|
|
if tc.format == FormatProtobuf && len(tc.indexes) > 0 {
|
|
if len(parsedEnvelope.Indexes) != len(tc.indexes) {
|
|
t.Errorf("Expected %d indexes, got %d", len(tc.indexes), len(parsedEnvelope.Indexes))
|
|
} else {
|
|
for i, expectedIndex := range tc.indexes {
|
|
if parsedEnvelope.Indexes[i] != expectedIndex {
|
|
t.Errorf("Expected index[%d]=%d, got %d", i, expectedIndex, parsedEnvelope.Indexes[i])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
t.Logf("Successfully round-tripped %s envelope: %d bytes", tc.name, len(envelopeBytes))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchemaMetadata_Preservation(t *testing.T) {
|
|
// Test that schema metadata is properly preserved through the reconstruction process
|
|
|
|
envelope := &ConfluentEnvelope{
|
|
Format: FormatAvro,
|
|
SchemaID: 42,
|
|
Indexes: []int{1, 2, 3},
|
|
Payload: []byte("test-payload"),
|
|
}
|
|
|
|
// Get metadata
|
|
metadata := envelope.Metadata()
|
|
|
|
// Verify metadata contents
|
|
expectedMetadata := map[string]string{
|
|
"schema_format": "AVRO",
|
|
"schema_id": "42",
|
|
"protobuf_indexes": "1,2,3",
|
|
}
|
|
|
|
for key, expectedValue := range expectedMetadata {
|
|
if metadata[key] != expectedValue {
|
|
t.Errorf("Expected metadata[%s]=%s, got %s", key, expectedValue, metadata[key])
|
|
}
|
|
}
|
|
|
|
// Test metadata reconstruction
|
|
reconstructedFormat := FormatUnknown
|
|
switch metadata["schema_format"] {
|
|
case "AVRO":
|
|
reconstructedFormat = FormatAvro
|
|
case "PROTOBUF":
|
|
reconstructedFormat = FormatProtobuf
|
|
case "JSON_SCHEMA":
|
|
reconstructedFormat = FormatJSONSchema
|
|
}
|
|
|
|
if reconstructedFormat != envelope.Format {
|
|
t.Errorf("Failed to reconstruct format from metadata: expected %v, got %v",
|
|
envelope.Format, reconstructedFormat)
|
|
}
|
|
|
|
t.Log("Successfully preserved and reconstructed schema metadata")
|
|
}
|
|
|
|
// Benchmark tests for reconstruction performance
|
|
func BenchmarkSchemaReconstruction_Avro(b *testing.B) {
|
|
// Setup
|
|
testMap := map[string]interface{}{
|
|
"id": int32(123),
|
|
"name": "John Doe",
|
|
}
|
|
recordValue := MapToRecordValue(testMap)
|
|
|
|
config := ManagerConfig{
|
|
RegistryURL: "http://localhost:8081",
|
|
}
|
|
|
|
manager, err := NewManager(config)
|
|
if err != nil {
|
|
b.Skip("Skipping benchmark - no registry available")
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
// This will fail without proper registry setup, but measures the overhead
|
|
_, _ = manager.EncodeMessage(recordValue, 1, FormatAvro)
|
|
}
|
|
}
|
|
|
|
func BenchmarkConfluentEnvelope_Creation(b *testing.B) {
|
|
payload := []byte("test-payload-for-benchmarking")
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_ = CreateConfluentEnvelope(FormatAvro, 1, nil, payload)
|
|
}
|
|
}
|
|
|
|
func BenchmarkConfluentEnvelope_Parsing(b *testing.B) {
|
|
envelope := CreateConfluentEnvelope(FormatAvro, 1, nil, []byte("test-payload"))
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, _ = ParseConfluentEnvelope(envelope)
|
|
}
|
|
}
|