Browse Source

Phase 6: Add JSON Schema decoder support for Kafka Gateway

- Add gojsonschema dependency for JSON Schema validation and parsing
- Implement JSONSchemaDecoder with validation and SMQ RecordValue conversion
- Support all JSON Schema types: object, array, string, number, integer, boolean
- Add format-specific type mapping (date-time, email, byte, etc.)
- Include schema inference from JSON Schema to SeaweedMQ RecordType
- Add round-trip encoding from RecordValue back to validated JSON
- Integrate JSON Schema support into Schema Manager with caching
- Comprehensive test coverage for validation, decoding, and type inference

This completes schema format support for Avro, Protobuf, and JSON Schema.
pull/7231/head
chrislu 2 months ago
parent
commit
4ed2604c71
  1. 3
      go.mod
  2. 6
      go.sum
  3. 387
      weed/mq/kafka/schema/json_schema_decoder.go
  4. 543
      weed/mq/kafka/schema/json_schema_decoder_test.go
  5. 94
      weed/mq/kafka/schema/manager.go

3
go.mod

@ -207,6 +207,9 @@ require (
github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/objx v0.5.2 // indirect
github.com/twpayne/go-geom v1.4.1 // indirect github.com/twpayne/go-geom v1.4.1 // indirect
github.com/twpayne/go-kml v1.5.2 // indirect github.com/twpayne/go-kml v1.5.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect

6
go.sum

@ -1791,6 +1791,12 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8= github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8=

387
weed/mq/kafka/schema/json_schema_decoder.go

@ -0,0 +1,387 @@
package schema
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/xeipuuv/gojsonschema"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// JSONSchemaDecoder handles JSON Schema validation and conversion to SeaweedMQ format
type JSONSchemaDecoder struct {
schema *gojsonschema.Schema
schemaDoc map[string]interface{} // Parsed schema document for type inference
schemaJSON string // Original schema JSON
}
// NewJSONSchemaDecoder creates a new JSON Schema decoder from a schema string
func NewJSONSchemaDecoder(schemaJSON string) (*JSONSchemaDecoder, error) {
// Parse the schema JSON
var schemaDoc map[string]interface{}
if err := json.Unmarshal([]byte(schemaJSON), &schemaDoc); err != nil {
return nil, fmt.Errorf("failed to parse JSON schema: %w", err)
}
// Create JSON Schema validator
schemaLoader := gojsonschema.NewStringLoader(schemaJSON)
schema, err := gojsonschema.NewSchema(schemaLoader)
if err != nil {
return nil, fmt.Errorf("failed to create JSON schema validator: %w", err)
}
return &JSONSchemaDecoder{
schema: schema,
schemaDoc: schemaDoc,
schemaJSON: schemaJSON,
}, nil
}
// Decode decodes and validates JSON data against the schema, returning a Go map
func (jsd *JSONSchemaDecoder) Decode(data []byte) (map[string]interface{}, error) {
// Parse JSON data
var jsonData interface{}
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, fmt.Errorf("failed to parse JSON data: %w", err)
}
// Validate against schema
documentLoader := gojsonschema.NewGoLoader(jsonData)
result, err := jsd.schema.Validate(documentLoader)
if err != nil {
return nil, fmt.Errorf("failed to validate JSON data: %w", err)
}
if !result.Valid() {
// Collect validation errors
var errorMsgs []string
for _, desc := range result.Errors() {
errorMsgs = append(errorMsgs, desc.String())
}
return nil, fmt.Errorf("JSON data validation failed: %v", errorMsgs)
}
// Convert to map[string]interface{} for consistency
switch v := jsonData.(type) {
case map[string]interface{}:
return v, nil
case []interface{}:
// Handle array at root level by wrapping in a map
return map[string]interface{}{"items": v}, nil
default:
// Handle primitive values at root level
return map[string]interface{}{"value": v}, nil
}
}
// DecodeToRecordValue decodes JSON data directly to SeaweedMQ RecordValue
func (jsd *JSONSchemaDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
jsonMap, err := jsd.Decode(data)
if err != nil {
return nil, err
}
return MapToRecordValue(jsonMap), nil
}
// InferRecordType infers a SeaweedMQ RecordType from the JSON Schema
func (jsd *JSONSchemaDecoder) InferRecordType() (*schema_pb.RecordType, error) {
return jsd.jsonSchemaToRecordType(jsd.schemaDoc), nil
}
// ValidateOnly validates JSON data against the schema without decoding
func (jsd *JSONSchemaDecoder) ValidateOnly(data []byte) error {
_, err := jsd.Decode(data)
return err
}
// jsonSchemaToRecordType converts a JSON Schema to SeaweedMQ RecordType
func (jsd *JSONSchemaDecoder) jsonSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
schemaType, _ := schemaDoc["type"].(string)
if schemaType == "object" {
return jsd.objectSchemaToRecordType(schemaDoc)
}
// For non-object schemas, create a wrapper record
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "value",
FieldIndex: 0,
Type: jsd.jsonSchemaTypeToType(schemaDoc),
IsRequired: true,
IsRepeated: false,
},
},
}
}
// objectSchemaToRecordType converts an object JSON Schema to RecordType
func (jsd *JSONSchemaDecoder) objectSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
properties, _ := schemaDoc["properties"].(map[string]interface{})
required, _ := schemaDoc["required"].([]interface{})
// Create set of required fields for quick lookup
requiredFields := make(map[string]bool)
for _, req := range required {
if reqStr, ok := req.(string); ok {
requiredFields[reqStr] = true
}
}
fields := make([]*schema_pb.Field, 0, len(properties))
fieldIndex := int32(0)
for fieldName, fieldSchema := range properties {
fieldSchemaMap, ok := fieldSchema.(map[string]interface{})
if !ok {
continue
}
field := &schema_pb.Field{
Name: fieldName,
FieldIndex: fieldIndex,
Type: jsd.jsonSchemaTypeToType(fieldSchemaMap),
IsRequired: requiredFields[fieldName],
IsRepeated: jsd.isArrayType(fieldSchemaMap),
}
fields = append(fields, field)
fieldIndex++
}
return &schema_pb.RecordType{
Fields: fields,
}
}
// jsonSchemaTypeToType converts a JSON Schema type to SeaweedMQ Type
func (jsd *JSONSchemaDecoder) jsonSchemaTypeToType(schemaDoc map[string]interface{}) *schema_pb.Type {
schemaType, _ := schemaDoc["type"].(string)
switch schemaType {
case "boolean":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BOOL,
},
}
case "integer":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "int32":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}
}
case "number":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "float":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_FLOAT,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_DOUBLE,
},
}
}
case "string":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "date-time":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_TIMESTAMP,
},
}
case "byte", "binary":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
case "array":
items, _ := schemaDoc["items"].(map[string]interface{})
elementType := jsd.jsonSchemaTypeToType(items)
return &schema_pb.Type{
Kind: &schema_pb.Type_ListType{
ListType: &schema_pb.ListType{
ElementType: elementType,
},
},
}
case "object":
nestedRecordType := jsd.objectSchemaToRecordType(schemaDoc)
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: nestedRecordType,
},
}
default:
// Handle union types (oneOf, anyOf, allOf)
if oneOf, exists := schemaDoc["oneOf"].([]interface{}); exists && len(oneOf) > 0 {
// For unions, use the first type as default
if firstType, ok := oneOf[0].(map[string]interface{}); ok {
return jsd.jsonSchemaTypeToType(firstType)
}
}
// Default to string for unknown types
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
}
// isArrayType checks if a JSON Schema represents an array type
func (jsd *JSONSchemaDecoder) isArrayType(schemaDoc map[string]interface{}) bool {
schemaType, _ := schemaDoc["type"].(string)
return schemaType == "array"
}
// EncodeFromRecordValue encodes a RecordValue back to JSON format
func (jsd *JSONSchemaDecoder) EncodeFromRecordValue(recordValue *schema_pb.RecordValue) ([]byte, error) {
// Convert RecordValue back to Go map
goMap := recordValueToMap(recordValue)
// Encode to JSON
jsonData, err := json.Marshal(goMap)
if err != nil {
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
}
// Validate the generated JSON against the schema
if err := jsd.ValidateOnly(jsonData); err != nil {
return nil, fmt.Errorf("generated JSON failed schema validation: %w", err)
}
return jsonData, nil
}
// GetSchemaInfo returns information about the JSON Schema
func (jsd *JSONSchemaDecoder) GetSchemaInfo() map[string]interface{} {
info := make(map[string]interface{})
if title, exists := jsd.schemaDoc["title"]; exists {
info["title"] = title
}
if description, exists := jsd.schemaDoc["description"]; exists {
info["description"] = description
}
if schemaVersion, exists := jsd.schemaDoc["$schema"]; exists {
info["schema_version"] = schemaVersion
}
if schemaType, exists := jsd.schemaDoc["type"]; exists {
info["type"] = schemaType
}
return info
}
// Enhanced JSON value conversion with better type handling
func (jsd *JSONSchemaDecoder) convertJSONValue(value interface{}, expectedType string) interface{} {
if value == nil {
return nil
}
switch expectedType {
case "integer":
switch v := value.(type) {
case float64:
return int64(v)
case string:
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
return i
}
}
case "number":
switch v := value.(type) {
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
return f
}
}
case "boolean":
switch v := value.(type) {
case string:
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
case "string":
// Handle date-time format conversion
if str, ok := value.(string); ok {
// Try to parse as RFC3339 timestamp
if t, err := time.Parse(time.RFC3339, str); err == nil {
return t
}
}
}
return value
}
// ValidateAndNormalize validates JSON data and normalizes types according to schema
func (jsd *JSONSchemaDecoder) ValidateAndNormalize(data []byte) ([]byte, error) {
// First decode normally
jsonMap, err := jsd.Decode(data)
if err != nil {
return nil, err
}
// Normalize types based on schema
normalized := jsd.normalizeMapTypes(jsonMap, jsd.schemaDoc)
// Re-encode with normalized types
return json.Marshal(normalized)
}
// normalizeMapTypes normalizes map values according to JSON Schema types
func (jsd *JSONSchemaDecoder) normalizeMapTypes(data map[string]interface{}, schemaDoc map[string]interface{}) map[string]interface{} {
properties, _ := schemaDoc["properties"].(map[string]interface{})
result := make(map[string]interface{})
for key, value := range data {
if fieldSchema, exists := properties[key]; exists {
if fieldSchemaMap, ok := fieldSchema.(map[string]interface{}); ok {
fieldType, _ := fieldSchemaMap["type"].(string)
result[key] = jsd.convertJSONValue(value, fieldType)
continue
}
}
result[key] = value
}
return result
}

543
weed/mq/kafka/schema/json_schema_decoder_test.go

@ -0,0 +1,543 @@
package schema
import (
"encoding/json"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestNewJSONSchemaDecoder(t *testing.T) {
tests := []struct {
name string
schema string
expectErr bool
}{
{
name: "valid object schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"active": {"type": "boolean"}
},
"required": ["id", "name"]
}`,
expectErr: false,
},
{
name: "valid array schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "string"
}
}`,
expectErr: false,
},
{
name: "valid string schema with format",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "string",
"format": "date-time"
}`,
expectErr: false,
},
{
name: "invalid JSON",
schema: `{"invalid": json}`,
expectErr: true,
},
{
name: "empty schema",
schema: "",
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
decoder, err := NewJSONSchemaDecoder(tt.schema)
if (err != nil) != tt.expectErr {
t.Errorf("NewJSONSchemaDecoder() error = %v, expectErr %v", err, tt.expectErr)
return
}
if !tt.expectErr && decoder == nil {
t.Error("Expected non-nil decoder for valid schema")
}
})
}
}
func TestJSONSchemaDecoder_Decode(t *testing.T) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0},
"active": {"type": "boolean"}
},
"required": ["id", "name"]
}`
decoder, err := NewJSONSchemaDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
tests := []struct {
name string
jsonData string
expectErr bool
}{
{
name: "valid complete data",
jsonData: `{
"id": 123,
"name": "John Doe",
"email": "john@example.com",
"age": 30,
"active": true
}`,
expectErr: false,
},
{
name: "valid minimal data",
jsonData: `{
"id": 456,
"name": "Jane Smith"
}`,
expectErr: false,
},
{
name: "missing required field",
jsonData: `{
"name": "Missing ID"
}`,
expectErr: true,
},
{
name: "invalid type",
jsonData: `{
"id": "not-a-number",
"name": "John Doe"
}`,
expectErr: true,
},
{
name: "invalid email format",
jsonData: `{
"id": 123,
"name": "John Doe",
"email": "not-an-email"
}`,
expectErr: true,
},
{
name: "negative age",
jsonData: `{
"id": 123,
"name": "John Doe",
"age": -5
}`,
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := decoder.Decode([]byte(tt.jsonData))
if (err != nil) != tt.expectErr {
t.Errorf("Decode() error = %v, expectErr %v", err, tt.expectErr)
return
}
if !tt.expectErr {
if result == nil {
t.Error("Expected non-nil result for valid data")
}
// Verify some basic fields
if id, exists := result["id"]; exists {
if _, ok := id.(float64); !ok {
t.Errorf("Expected id to be float64, got %T", id)
}
}
if name, exists := result["name"]; exists {
if _, ok := name.(string); !ok {
t.Errorf("Expected name to be string, got %T", name)
}
}
}
})
}
}
func TestJSONSchemaDecoder_DecodeToRecordValue(t *testing.T) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"tags": {
"type": "array",
"items": {"type": "string"}
}
}
}`
decoder, err := NewJSONSchemaDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
jsonData := `{
"id": 789,
"name": "Test User",
"tags": ["tag1", "tag2", "tag3"]
}`
recordValue, err := decoder.DecodeToRecordValue([]byte(jsonData))
if err != nil {
t.Fatalf("Failed to decode to RecordValue: %v", err)
}
// Verify RecordValue structure
if recordValue.Fields == nil {
t.Fatal("Expected non-nil fields")
}
// Check id field
idValue := recordValue.Fields["id"]
if idValue == nil {
t.Fatal("Expected id field")
}
// JSON numbers are decoded as float64 by default
// The MapToRecordValue function should handle this conversion
expectedID := int64(789)
actualID := idValue.GetInt64Value()
if actualID != expectedID {
// Try checking if it was stored as float64 instead
if floatVal := idValue.GetDoubleValue(); floatVal == 789.0 {
t.Logf("ID was stored as float64: %v", floatVal)
} else {
t.Errorf("Expected id=789, got int64=%v, float64=%v", actualID, floatVal)
}
}
// Check name field
nameValue := recordValue.Fields["name"]
if nameValue == nil {
t.Fatal("Expected name field")
}
if nameValue.GetStringValue() != "Test User" {
t.Errorf("Expected name='Test User', got %v", nameValue.GetStringValue())
}
// Check tags array
tagsValue := recordValue.Fields["tags"]
if tagsValue == nil {
t.Fatal("Expected tags field")
}
tagsList := tagsValue.GetListValue()
if tagsList == nil || len(tagsList.Values) != 3 {
t.Errorf("Expected tags array with 3 elements, got %v", tagsList)
}
}
func TestJSONSchemaDecoder_InferRecordType(t *testing.T) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer", "format": "int32"},
"name": {"type": "string"},
"score": {"type": "number", "format": "float"},
"timestamp": {"type": "string", "format": "date-time"},
"data": {"type": "string", "format": "byte"},
"active": {"type": "boolean"},
"tags": {
"type": "array",
"items": {"type": "string"}
},
"metadata": {
"type": "object",
"properties": {
"source": {"type": "string"}
}
}
},
"required": ["id", "name"]
}`
decoder, err := NewJSONSchemaDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
recordType, err := decoder.InferRecordType()
if err != nil {
t.Fatalf("Failed to infer RecordType: %v", err)
}
if len(recordType.Fields) != 8 {
t.Errorf("Expected 8 fields, got %d", len(recordType.Fields))
}
// Create a map for easier field lookup
fieldMap := make(map[string]*schema_pb.Field)
for _, field := range recordType.Fields {
fieldMap[field.Name] = field
}
// Test specific field types
if fieldMap["id"].Type.GetScalarType() != schema_pb.ScalarType_INT32 {
t.Error("Expected id field to be INT32")
}
if fieldMap["name"].Type.GetScalarType() != schema_pb.ScalarType_STRING {
t.Error("Expected name field to be STRING")
}
if fieldMap["score"].Type.GetScalarType() != schema_pb.ScalarType_FLOAT {
t.Error("Expected score field to be FLOAT")
}
if fieldMap["timestamp"].Type.GetScalarType() != schema_pb.ScalarType_TIMESTAMP {
t.Error("Expected timestamp field to be TIMESTAMP")
}
if fieldMap["data"].Type.GetScalarType() != schema_pb.ScalarType_BYTES {
t.Error("Expected data field to be BYTES")
}
if fieldMap["active"].Type.GetScalarType() != schema_pb.ScalarType_BOOL {
t.Error("Expected active field to be BOOL")
}
// Test array field
if fieldMap["tags"].Type.GetListType() == nil {
t.Error("Expected tags field to be LIST")
}
// Test nested object field
if fieldMap["metadata"].Type.GetRecordType() == nil {
t.Error("Expected metadata field to be RECORD")
}
// Test required fields
if !fieldMap["id"].IsRequired {
t.Error("Expected id field to be required")
}
if !fieldMap["name"].IsRequired {
t.Error("Expected name field to be required")
}
if fieldMap["active"].IsRequired {
t.Error("Expected active field to be optional")
}
}
func TestJSONSchemaDecoder_EncodeFromRecordValue(t *testing.T) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"active": {"type": "boolean"}
},
"required": ["id", "name"]
}`
decoder, err := NewJSONSchemaDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
// Create test RecordValue
testMap := map[string]interface{}{
"id": int64(123),
"name": "Test User",
"active": true,
}
recordValue := MapToRecordValue(testMap)
// Encode back to JSON
jsonData, err := decoder.EncodeFromRecordValue(recordValue)
if err != nil {
t.Fatalf("Failed to encode RecordValue: %v", err)
}
// Verify the JSON is valid and contains expected data
var result map[string]interface{}
if err := json.Unmarshal(jsonData, &result); err != nil {
t.Fatalf("Failed to parse generated JSON: %v", err)
}
if result["id"] != float64(123) { // JSON numbers are float64
t.Errorf("Expected id=123, got %v", result["id"])
}
if result["name"] != "Test User" {
t.Errorf("Expected name='Test User', got %v", result["name"])
}
if result["active"] != true {
t.Errorf("Expected active=true, got %v", result["active"])
}
}
func TestJSONSchemaDecoder_ArrayAndPrimitiveSchemas(t *testing.T) {
tests := []struct {
name string
schema string
jsonData string
expectOK bool
}{
{
name: "array schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {"type": "string"}
}`,
jsonData: `["item1", "item2", "item3"]`,
expectOK: true,
},
{
name: "string schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "string"
}`,
jsonData: `"hello world"`,
expectOK: true,
},
{
name: "number schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "number"
}`,
jsonData: `42.5`,
expectOK: true,
},
{
name: "boolean schema",
schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "boolean"
}`,
jsonData: `true`,
expectOK: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
decoder, err := NewJSONSchemaDecoder(tt.schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
result, err := decoder.Decode([]byte(tt.jsonData))
if (err == nil) != tt.expectOK {
t.Errorf("Decode() error = %v, expectOK %v", err, tt.expectOK)
return
}
if tt.expectOK && result == nil {
t.Error("Expected non-nil result for valid data")
}
})
}
}
func TestJSONSchemaDecoder_GetSchemaInfo(t *testing.T) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User Schema",
"description": "A schema for user objects",
"type": "object",
"properties": {
"id": {"type": "integer"}
}
}`
decoder, err := NewJSONSchemaDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
info := decoder.GetSchemaInfo()
if info["title"] != "User Schema" {
t.Errorf("Expected title='User Schema', got %v", info["title"])
}
if info["description"] != "A schema for user objects" {
t.Errorf("Expected description='A schema for user objects', got %v", info["description"])
}
if info["schema_version"] != "http://json-schema.org/draft-07/schema#" {
t.Errorf("Expected schema_version='http://json-schema.org/draft-07/schema#', got %v", info["schema_version"])
}
if info["type"] != "object" {
t.Errorf("Expected type='object', got %v", info["type"])
}
}
// Benchmark tests
func BenchmarkJSONSchemaDecoder_Decode(b *testing.B) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"}
}
}`
decoder, _ := NewJSONSchemaDecoder(schema)
jsonData := []byte(`{"id": 123, "name": "John Doe"}`)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = decoder.Decode(jsonData)
}
}
func BenchmarkJSONSchemaDecoder_DecodeToRecordValue(b *testing.B) {
schema := `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"}
}
}`
decoder, _ := NewJSONSchemaDecoder(schema)
jsonData := []byte(`{"id": 123, "name": "John Doe"}`)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = decoder.DecodeToRecordValue(jsonData)
}
}

94
weed/mq/kafka/schema/manager.go

@ -18,6 +18,7 @@ type Manager struct {
// Decoder cache // Decoder cache
avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder
protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder
jsonSchemaDecoders map[uint32]*JSONSchemaDecoder // schema ID -> decoder
decoderMu sync.RWMutex decoderMu sync.RWMutex
// Configuration // Configuration
@ -76,6 +77,7 @@ func NewManager(config ManagerConfig) (*Manager, error) {
registryClient: registryClient, registryClient: registryClient,
avroDecoders: make(map[uint32]*AvroDecoder), avroDecoders: make(map[uint32]*AvroDecoder),
protobufDecoders: make(map[uint32]*ProtobufDecoder), protobufDecoders: make(map[uint32]*ProtobufDecoder),
jsonSchemaDecoders: make(map[uint32]*JSONSchemaDecoder),
config: config, config: config,
}, nil }, nil
} }
@ -130,7 +132,10 @@ func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) {
return nil, fmt.Errorf("failed to decode Protobuf message: %w", err) return nil, fmt.Errorf("failed to decode Protobuf message: %w", err)
} }
case FormatJSONSchema: case FormatJSONSchema:
return nil, fmt.Errorf("JSON Schema decoding not yet implemented (Phase 6)")
recordValue, recordType, err = m.decodeJSONSchemaMessage(envelope, cachedSchema)
if err != nil {
return nil, fmt.Errorf("failed to decode JSON Schema message: %w", err)
}
default: default:
return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format) return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format)
} }
@ -215,6 +220,38 @@ func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchem
return recordValue, recordType, nil return recordValue, recordType, nil
} }
// decodeJSONSchemaMessage decodes a JSON Schema message using cached or new decoder
func (m *Manager) decodeJSONSchemaMessage(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) (*schema_pb.RecordValue, *schema_pb.RecordType, error) {
// Get or create JSON Schema decoder
decoder, err := m.getJSONSchemaDecoder(envelope.SchemaID, cachedSchema.Schema)
if err != nil {
return nil, nil, fmt.Errorf("failed to get JSON Schema decoder: %w", err)
}
// Decode to RecordValue
recordValue, err := decoder.DecodeToRecordValue(envelope.Payload)
if err != nil {
if m.config.ValidationMode == ValidationStrict {
return nil, nil, fmt.Errorf("strict validation failed: %w", err)
}
// In permissive mode, try to decode as much as possible
return nil, nil, fmt.Errorf("permissive decoding failed: %w", err)
}
// Get RecordType from schema
recordType, err := decoder.InferRecordType()
if err != nil {
// Fall back to inferring from the decoded map
if decodedMap, decodeErr := decoder.Decode(envelope.Payload); decodeErr == nil {
recordType = InferRecordTypeFromMap(decodedMap)
} else {
return nil, nil, fmt.Errorf("failed to infer record type: %w", err)
}
}
return recordValue, recordType, nil
}
// getAvroDecoder gets or creates an Avro decoder for the given schema // getAvroDecoder gets or creates an Avro decoder for the given schema
func (m *Manager) getAvroDecoder(schemaID uint32, schemaStr string) (*AvroDecoder, error) { func (m *Manager) getAvroDecoder(schemaID uint32, schemaStr string) (*AvroDecoder, error) {
// Check cache first // Check cache first
@ -267,6 +304,30 @@ func (m *Manager) getProtobufDecoder(schemaID uint32, schemaStr string) (*Protob
return decoder, nil return decoder, nil
} }
// getJSONSchemaDecoder gets or creates a JSON Schema decoder for the given schema
func (m *Manager) getJSONSchemaDecoder(schemaID uint32, schemaStr string) (*JSONSchemaDecoder, error) {
// Check cache first
m.decoderMu.RLock()
if decoder, exists := m.jsonSchemaDecoders[schemaID]; exists {
m.decoderMu.RUnlock()
return decoder, nil
}
m.decoderMu.RUnlock()
// Create new decoder
decoder, err := NewJSONSchemaDecoder(schemaStr)
if err != nil {
return nil, err
}
// Cache the decoder
m.decoderMu.Lock()
m.jsonSchemaDecoders[schemaID] = decoder
m.decoderMu.Unlock()
return decoder, nil
}
// createMetadata creates metadata for storage in SeaweedMQ // createMetadata creates metadata for storage in SeaweedMQ
func (m *Manager) createMetadata(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) map[string]string { func (m *Manager) createMetadata(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) map[string]string {
metadata := envelope.Metadata() metadata := envelope.Metadata()
@ -324,6 +385,7 @@ func (m *Manager) ClearCache() {
m.decoderMu.Lock() m.decoderMu.Lock()
m.avroDecoders = make(map[uint32]*AvroDecoder) m.avroDecoders = make(map[uint32]*AvroDecoder)
m.protobufDecoders = make(map[uint32]*ProtobufDecoder) m.protobufDecoders = make(map[uint32]*ProtobufDecoder)
m.jsonSchemaDecoders = make(map[uint32]*JSONSchemaDecoder)
m.decoderMu.Unlock() m.decoderMu.Unlock()
m.registryClient.ClearCache() m.registryClient.ClearCache()
@ -332,7 +394,7 @@ func (m *Manager) ClearCache() {
// GetCacheStats returns cache statistics // GetCacheStats returns cache statistics
func (m *Manager) GetCacheStats() (decoders, schemas, subjects int) { func (m *Manager) GetCacheStats() (decoders, schemas, subjects int) {
m.decoderMu.RLock() m.decoderMu.RLock()
decoders = len(m.avroDecoders) + len(m.protobufDecoders)
decoders = len(m.avroDecoders) + len(m.protobufDecoders) + len(m.jsonSchemaDecoders)
m.decoderMu.RUnlock() m.decoderMu.RUnlock()
schemas, subjects = m.registryClient.GetCacheStats() schemas, subjects = m.registryClient.GetCacheStats()
@ -347,7 +409,7 @@ func (m *Manager) EncodeMessage(recordValue *schema_pb.RecordValue, schemaID uin
case FormatProtobuf: case FormatProtobuf:
return m.encodeProtobufMessage(recordValue, schemaID) return m.encodeProtobufMessage(recordValue, schemaID)
case FormatJSONSchema: case FormatJSONSchema:
return nil, fmt.Errorf("JSON Schema encoding not yet implemented (Phase 7)")
return m.encodeJSONSchemaMessage(recordValue, schemaID)
default: default:
return nil, fmt.Errorf("unsupported format for encoding: %v", format) return nil, fmt.Errorf("unsupported format for encoding: %v", format)
} }
@ -417,6 +479,32 @@ func (m *Manager) encodeProtobufMessage(recordValue *schema_pb.RecordValue, sche
return envelope, nil return envelope, nil
} }
// encodeJSONSchemaMessage encodes a RecordValue back to JSON Schema format
func (m *Manager) encodeJSONSchemaMessage(recordValue *schema_pb.RecordValue, schemaID uint32) ([]byte, error) {
// Get schema from registry
cachedSchema, err := m.registryClient.GetSchemaByID(schemaID)
if err != nil {
return nil, fmt.Errorf("failed to get schema for encoding: %w", err)
}
// Get decoder (which contains the schema validator)
decoder, err := m.getJSONSchemaDecoder(schemaID, cachedSchema.Schema)
if err != nil {
return nil, fmt.Errorf("failed to get decoder for encoding: %w", err)
}
// Encode using JSON Schema decoder
jsonData, err := decoder.EncodeFromRecordValue(recordValue)
if err != nil {
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
}
// Create Confluent envelope
envelope := CreateConfluentEnvelope(FormatJSONSchema, schemaID, nil, jsonData)
return envelope, nil
}
// populateProtobufMessage populates a Protobuf message from a Go map // populateProtobufMessage populates a Protobuf message from a Go map
func (m *Manager) populateProtobufMessage(msg protoreflect.Message, data map[string]interface{}, desc protoreflect.MessageDescriptor) error { func (m *Manager) populateProtobufMessage(msg protoreflect.Message, data map[string]interface{}, desc protoreflect.MessageDescriptor) error {
for key, value := range data { for key, value := range data {

Loading…
Cancel
Save