diff --git a/go.mod b/go.mod index 1233188e3..a645deef9 100644 --- a/go.mod +++ b/go.mod @@ -207,6 +207,9 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/twpayne/go-geom v1.4.1 // indirect github.com/twpayne/go-kml v1.5.2 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect diff --git a/go.sum b/go.sum index 0854b14a8..d8e7978a0 100644 --- a/go.sum +++ b/go.sum @@ -1791,6 +1791,12 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8= diff --git a/weed/mq/kafka/schema/json_schema_decoder.go b/weed/mq/kafka/schema/json_schema_decoder.go new file mode 100644 index 000000000..ce23d078e --- /dev/null +++ b/weed/mq/kafka/schema/json_schema_decoder.go @@ -0,0 +1,387 @@ +package schema + +import ( + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/xeipuuv/gojsonschema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// JSONSchemaDecoder handles JSON Schema validation and conversion to SeaweedMQ format +type JSONSchemaDecoder struct { + schema *gojsonschema.Schema + schemaDoc map[string]interface{} // Parsed schema document for type inference + schemaJSON string // Original schema JSON +} + +// NewJSONSchemaDecoder creates a new JSON Schema decoder from a schema string +func NewJSONSchemaDecoder(schemaJSON string) (*JSONSchemaDecoder, error) { + // Parse the schema JSON + var schemaDoc map[string]interface{} + if err := json.Unmarshal([]byte(schemaJSON), &schemaDoc); err != nil { + return nil, fmt.Errorf("failed to parse JSON schema: %w", err) + } + + // Create JSON Schema validator + schemaLoader := gojsonschema.NewStringLoader(schemaJSON) + schema, err := gojsonschema.NewSchema(schemaLoader) + if err != nil { + return nil, fmt.Errorf("failed to create JSON schema validator: %w", err) + } + + return &JSONSchemaDecoder{ + schema: schema, + schemaDoc: schemaDoc, + schemaJSON: schemaJSON, + }, nil +} + +// Decode decodes and validates JSON data against the schema, returning a Go map +func (jsd *JSONSchemaDecoder) Decode(data []byte) (map[string]interface{}, error) { + // Parse JSON data + var jsonData interface{} + if err := json.Unmarshal(data, &jsonData); err != nil { + return nil, fmt.Errorf("failed to parse JSON data: %w", err) + } + + // Validate against schema + documentLoader := gojsonschema.NewGoLoader(jsonData) + result, err := jsd.schema.Validate(documentLoader) + if err != nil { + return nil, fmt.Errorf("failed to validate JSON data: %w", err) + } + + if !result.Valid() { + // Collect validation errors + var errorMsgs []string + for _, desc := range result.Errors() { + errorMsgs = append(errorMsgs, desc.String()) + } + return nil, fmt.Errorf("JSON data validation failed: %v", errorMsgs) + } + + // Convert to map[string]interface{} for consistency + switch v := jsonData.(type) { + case map[string]interface{}: + return v, nil + case []interface{}: + // Handle array at root level by wrapping in a map + return map[string]interface{}{"items": v}, nil + default: + // Handle primitive values at root level + return map[string]interface{}{"value": v}, nil + } +} + +// DecodeToRecordValue decodes JSON data directly to SeaweedMQ RecordValue +func (jsd *JSONSchemaDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) { + jsonMap, err := jsd.Decode(data) + if err != nil { + return nil, err + } + + return MapToRecordValue(jsonMap), nil +} + +// InferRecordType infers a SeaweedMQ RecordType from the JSON Schema +func (jsd *JSONSchemaDecoder) InferRecordType() (*schema_pb.RecordType, error) { + return jsd.jsonSchemaToRecordType(jsd.schemaDoc), nil +} + +// ValidateOnly validates JSON data against the schema without decoding +func (jsd *JSONSchemaDecoder) ValidateOnly(data []byte) error { + _, err := jsd.Decode(data) + return err +} + +// jsonSchemaToRecordType converts a JSON Schema to SeaweedMQ RecordType +func (jsd *JSONSchemaDecoder) jsonSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType { + schemaType, _ := schemaDoc["type"].(string) + + if schemaType == "object" { + return jsd.objectSchemaToRecordType(schemaDoc) + } + + // For non-object schemas, create a wrapper record + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + { + Name: "value", + FieldIndex: 0, + Type: jsd.jsonSchemaTypeToType(schemaDoc), + IsRequired: true, + IsRepeated: false, + }, + }, + } +} + +// objectSchemaToRecordType converts an object JSON Schema to RecordType +func (jsd *JSONSchemaDecoder) objectSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType { + properties, _ := schemaDoc["properties"].(map[string]interface{}) + required, _ := schemaDoc["required"].([]interface{}) + + // Create set of required fields for quick lookup + requiredFields := make(map[string]bool) + for _, req := range required { + if reqStr, ok := req.(string); ok { + requiredFields[reqStr] = true + } + } + + fields := make([]*schema_pb.Field, 0, len(properties)) + fieldIndex := int32(0) + + for fieldName, fieldSchema := range properties { + fieldSchemaMap, ok := fieldSchema.(map[string]interface{}) + if !ok { + continue + } + + field := &schema_pb.Field{ + Name: fieldName, + FieldIndex: fieldIndex, + Type: jsd.jsonSchemaTypeToType(fieldSchemaMap), + IsRequired: requiredFields[fieldName], + IsRepeated: jsd.isArrayType(fieldSchemaMap), + } + + fields = append(fields, field) + fieldIndex++ + } + + return &schema_pb.RecordType{ + Fields: fields, + } +} + +// jsonSchemaTypeToType converts a JSON Schema type to SeaweedMQ Type +func (jsd *JSONSchemaDecoder) jsonSchemaTypeToType(schemaDoc map[string]interface{}) *schema_pb.Type { + schemaType, _ := schemaDoc["type"].(string) + + switch schemaType { + case "boolean": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BOOL, + }, + } + case "integer": + // Check for format hints + format, _ := schemaDoc["format"].(string) + switch format { + case "int32": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT32, + }, + } + default: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + } + } + case "number": + // Check for format hints + format, _ := schemaDoc["format"].(string) + switch format { + case "float": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_FLOAT, + }, + } + default: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_DOUBLE, + }, + } + } + case "string": + // Check for format hints + format, _ := schemaDoc["format"].(string) + switch format { + case "date-time": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_TIMESTAMP, + }, + } + case "byte", "binary": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + } + default: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + } + case "array": + items, _ := schemaDoc["items"].(map[string]interface{}) + elementType := jsd.jsonSchemaTypeToType(items) + return &schema_pb.Type{ + Kind: &schema_pb.Type_ListType{ + ListType: &schema_pb.ListType{ + ElementType: elementType, + }, + }, + } + case "object": + nestedRecordType := jsd.objectSchemaToRecordType(schemaDoc) + return &schema_pb.Type{ + Kind: &schema_pb.Type_RecordType{ + RecordType: nestedRecordType, + }, + } + default: + // Handle union types (oneOf, anyOf, allOf) + if oneOf, exists := schemaDoc["oneOf"].([]interface{}); exists && len(oneOf) > 0 { + // For unions, use the first type as default + if firstType, ok := oneOf[0].(map[string]interface{}); ok { + return jsd.jsonSchemaTypeToType(firstType) + } + } + + // Default to string for unknown types + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + } +} + +// isArrayType checks if a JSON Schema represents an array type +func (jsd *JSONSchemaDecoder) isArrayType(schemaDoc map[string]interface{}) bool { + schemaType, _ := schemaDoc["type"].(string) + return schemaType == "array" +} + +// EncodeFromRecordValue encodes a RecordValue back to JSON format +func (jsd *JSONSchemaDecoder) EncodeFromRecordValue(recordValue *schema_pb.RecordValue) ([]byte, error) { + // Convert RecordValue back to Go map + goMap := recordValueToMap(recordValue) + + // Encode to JSON + jsonData, err := json.Marshal(goMap) + if err != nil { + return nil, fmt.Errorf("failed to encode to JSON: %w", err) + } + + // Validate the generated JSON against the schema + if err := jsd.ValidateOnly(jsonData); err != nil { + return nil, fmt.Errorf("generated JSON failed schema validation: %w", err) + } + + return jsonData, nil +} + +// GetSchemaInfo returns information about the JSON Schema +func (jsd *JSONSchemaDecoder) GetSchemaInfo() map[string]interface{} { + info := make(map[string]interface{}) + + if title, exists := jsd.schemaDoc["title"]; exists { + info["title"] = title + } + + if description, exists := jsd.schemaDoc["description"]; exists { + info["description"] = description + } + + if schemaVersion, exists := jsd.schemaDoc["$schema"]; exists { + info["schema_version"] = schemaVersion + } + + if schemaType, exists := jsd.schemaDoc["type"]; exists { + info["type"] = schemaType + } + + return info +} + +// Enhanced JSON value conversion with better type handling +func (jsd *JSONSchemaDecoder) convertJSONValue(value interface{}, expectedType string) interface{} { + if value == nil { + return nil + } + + switch expectedType { + case "integer": + switch v := value.(type) { + case float64: + return int64(v) + case string: + if i, err := strconv.ParseInt(v, 10, 64); err == nil { + return i + } + } + case "number": + switch v := value.(type) { + case string: + if f, err := strconv.ParseFloat(v, 64); err == nil { + return f + } + } + case "boolean": + switch v := value.(type) { + case string: + if b, err := strconv.ParseBool(v); err == nil { + return b + } + } + case "string": + // Handle date-time format conversion + if str, ok := value.(string); ok { + // Try to parse as RFC3339 timestamp + if t, err := time.Parse(time.RFC3339, str); err == nil { + return t + } + } + } + + return value +} + +// ValidateAndNormalize validates JSON data and normalizes types according to schema +func (jsd *JSONSchemaDecoder) ValidateAndNormalize(data []byte) ([]byte, error) { + // First decode normally + jsonMap, err := jsd.Decode(data) + if err != nil { + return nil, err + } + + // Normalize types based on schema + normalized := jsd.normalizeMapTypes(jsonMap, jsd.schemaDoc) + + // Re-encode with normalized types + return json.Marshal(normalized) +} + +// normalizeMapTypes normalizes map values according to JSON Schema types +func (jsd *JSONSchemaDecoder) normalizeMapTypes(data map[string]interface{}, schemaDoc map[string]interface{}) map[string]interface{} { + properties, _ := schemaDoc["properties"].(map[string]interface{}) + result := make(map[string]interface{}) + + for key, value := range data { + if fieldSchema, exists := properties[key]; exists { + if fieldSchemaMap, ok := fieldSchema.(map[string]interface{}); ok { + fieldType, _ := fieldSchemaMap["type"].(string) + result[key] = jsd.convertJSONValue(value, fieldType) + continue + } + } + result[key] = value + } + + return result +} diff --git a/weed/mq/kafka/schema/json_schema_decoder_test.go b/weed/mq/kafka/schema/json_schema_decoder_test.go new file mode 100644 index 000000000..42cffd80a --- /dev/null +++ b/weed/mq/kafka/schema/json_schema_decoder_test.go @@ -0,0 +1,543 @@ +package schema + +import ( + "encoding/json" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestNewJSONSchemaDecoder(t *testing.T) { + tests := []struct { + name string + schema string + expectErr bool + }{ + { + name: "valid object schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "active": {"type": "boolean"} + }, + "required": ["id", "name"] + }`, + expectErr: false, + }, + { + name: "valid array schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "array", + "items": { + "type": "string" + } + }`, + expectErr: false, + }, + { + name: "valid string schema with format", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "string", + "format": "date-time" + }`, + expectErr: false, + }, + { + name: "invalid JSON", + schema: `{"invalid": json}`, + expectErr: true, + }, + { + name: "empty schema", + schema: "", + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoder, err := NewJSONSchemaDecoder(tt.schema) + + if (err != nil) != tt.expectErr { + t.Errorf("NewJSONSchemaDecoder() error = %v, expectErr %v", err, tt.expectErr) + return + } + + if !tt.expectErr && decoder == nil { + t.Error("Expected non-nil decoder for valid schema") + } + }) + } +} + +func TestJSONSchemaDecoder_Decode(t *testing.T) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "email": {"type": "string", "format": "email"}, + "age": {"type": "integer", "minimum": 0}, + "active": {"type": "boolean"} + }, + "required": ["id", "name"] + }` + + decoder, err := NewJSONSchemaDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + tests := []struct { + name string + jsonData string + expectErr bool + }{ + { + name: "valid complete data", + jsonData: `{ + "id": 123, + "name": "John Doe", + "email": "john@example.com", + "age": 30, + "active": true + }`, + expectErr: false, + }, + { + name: "valid minimal data", + jsonData: `{ + "id": 456, + "name": "Jane Smith" + }`, + expectErr: false, + }, + { + name: "missing required field", + jsonData: `{ + "name": "Missing ID" + }`, + expectErr: true, + }, + { + name: "invalid type", + jsonData: `{ + "id": "not-a-number", + "name": "John Doe" + }`, + expectErr: true, + }, + { + name: "invalid email format", + jsonData: `{ + "id": 123, + "name": "John Doe", + "email": "not-an-email" + }`, + expectErr: true, + }, + { + name: "negative age", + jsonData: `{ + "id": 123, + "name": "John Doe", + "age": -5 + }`, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := decoder.Decode([]byte(tt.jsonData)) + + if (err != nil) != tt.expectErr { + t.Errorf("Decode() error = %v, expectErr %v", err, tt.expectErr) + return + } + + if !tt.expectErr { + if result == nil { + t.Error("Expected non-nil result for valid data") + } + + // Verify some basic fields + if id, exists := result["id"]; exists { + if _, ok := id.(float64); !ok { + t.Errorf("Expected id to be float64, got %T", id) + } + } + + if name, exists := result["name"]; exists { + if _, ok := name.(string); !ok { + t.Errorf("Expected name to be string, got %T", name) + } + } + } + }) + } +} + +func TestJSONSchemaDecoder_DecodeToRecordValue(t *testing.T) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "tags": { + "type": "array", + "items": {"type": "string"} + } + } + }` + + decoder, err := NewJSONSchemaDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + jsonData := `{ + "id": 789, + "name": "Test User", + "tags": ["tag1", "tag2", "tag3"] + }` + + recordValue, err := decoder.DecodeToRecordValue([]byte(jsonData)) + if err != nil { + t.Fatalf("Failed to decode to RecordValue: %v", err) + } + + // Verify RecordValue structure + if recordValue.Fields == nil { + t.Fatal("Expected non-nil fields") + } + + // Check id field + idValue := recordValue.Fields["id"] + if idValue == nil { + t.Fatal("Expected id field") + } + // JSON numbers are decoded as float64 by default + // The MapToRecordValue function should handle this conversion + expectedID := int64(789) + actualID := idValue.GetInt64Value() + if actualID != expectedID { + // Try checking if it was stored as float64 instead + if floatVal := idValue.GetDoubleValue(); floatVal == 789.0 { + t.Logf("ID was stored as float64: %v", floatVal) + } else { + t.Errorf("Expected id=789, got int64=%v, float64=%v", actualID, floatVal) + } + } + + // Check name field + nameValue := recordValue.Fields["name"] + if nameValue == nil { + t.Fatal("Expected name field") + } + if nameValue.GetStringValue() != "Test User" { + t.Errorf("Expected name='Test User', got %v", nameValue.GetStringValue()) + } + + // Check tags array + tagsValue := recordValue.Fields["tags"] + if tagsValue == nil { + t.Fatal("Expected tags field") + } + tagsList := tagsValue.GetListValue() + if tagsList == nil || len(tagsList.Values) != 3 { + t.Errorf("Expected tags array with 3 elements, got %v", tagsList) + } +} + +func TestJSONSchemaDecoder_InferRecordType(t *testing.T) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer", "format": "int32"}, + "name": {"type": "string"}, + "score": {"type": "number", "format": "float"}, + "timestamp": {"type": "string", "format": "date-time"}, + "data": {"type": "string", "format": "byte"}, + "active": {"type": "boolean"}, + "tags": { + "type": "array", + "items": {"type": "string"} + }, + "metadata": { + "type": "object", + "properties": { + "source": {"type": "string"} + } + } + }, + "required": ["id", "name"] + }` + + decoder, err := NewJSONSchemaDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + recordType, err := decoder.InferRecordType() + if err != nil { + t.Fatalf("Failed to infer RecordType: %v", err) + } + + if len(recordType.Fields) != 8 { + t.Errorf("Expected 8 fields, got %d", len(recordType.Fields)) + } + + // Create a map for easier field lookup + fieldMap := make(map[string]*schema_pb.Field) + for _, field := range recordType.Fields { + fieldMap[field.Name] = field + } + + // Test specific field types + if fieldMap["id"].Type.GetScalarType() != schema_pb.ScalarType_INT32 { + t.Error("Expected id field to be INT32") + } + + if fieldMap["name"].Type.GetScalarType() != schema_pb.ScalarType_STRING { + t.Error("Expected name field to be STRING") + } + + if fieldMap["score"].Type.GetScalarType() != schema_pb.ScalarType_FLOAT { + t.Error("Expected score field to be FLOAT") + } + + if fieldMap["timestamp"].Type.GetScalarType() != schema_pb.ScalarType_TIMESTAMP { + t.Error("Expected timestamp field to be TIMESTAMP") + } + + if fieldMap["data"].Type.GetScalarType() != schema_pb.ScalarType_BYTES { + t.Error("Expected data field to be BYTES") + } + + if fieldMap["active"].Type.GetScalarType() != schema_pb.ScalarType_BOOL { + t.Error("Expected active field to be BOOL") + } + + // Test array field + if fieldMap["tags"].Type.GetListType() == nil { + t.Error("Expected tags field to be LIST") + } + + // Test nested object field + if fieldMap["metadata"].Type.GetRecordType() == nil { + t.Error("Expected metadata field to be RECORD") + } + + // Test required fields + if !fieldMap["id"].IsRequired { + t.Error("Expected id field to be required") + } + + if !fieldMap["name"].IsRequired { + t.Error("Expected name field to be required") + } + + if fieldMap["active"].IsRequired { + t.Error("Expected active field to be optional") + } +} + +func TestJSONSchemaDecoder_EncodeFromRecordValue(t *testing.T) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "active": {"type": "boolean"} + }, + "required": ["id", "name"] + }` + + decoder, err := NewJSONSchemaDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + // Create test RecordValue + testMap := map[string]interface{}{ + "id": int64(123), + "name": "Test User", + "active": true, + } + recordValue := MapToRecordValue(testMap) + + // Encode back to JSON + jsonData, err := decoder.EncodeFromRecordValue(recordValue) + if err != nil { + t.Fatalf("Failed to encode RecordValue: %v", err) + } + + // Verify the JSON is valid and contains expected data + var result map[string]interface{} + if err := json.Unmarshal(jsonData, &result); err != nil { + t.Fatalf("Failed to parse generated JSON: %v", err) + } + + if result["id"] != float64(123) { // JSON numbers are float64 + t.Errorf("Expected id=123, got %v", result["id"]) + } + + if result["name"] != "Test User" { + t.Errorf("Expected name='Test User', got %v", result["name"]) + } + + if result["active"] != true { + t.Errorf("Expected active=true, got %v", result["active"]) + } +} + +func TestJSONSchemaDecoder_ArrayAndPrimitiveSchemas(t *testing.T) { + tests := []struct { + name string + schema string + jsonData string + expectOK bool + }{ + { + name: "array schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "array", + "items": {"type": "string"} + }`, + jsonData: `["item1", "item2", "item3"]`, + expectOK: true, + }, + { + name: "string schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "string" + }`, + jsonData: `"hello world"`, + expectOK: true, + }, + { + name: "number schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "number" + }`, + jsonData: `42.5`, + expectOK: true, + }, + { + name: "boolean schema", + schema: `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "boolean" + }`, + jsonData: `true`, + expectOK: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoder, err := NewJSONSchemaDecoder(tt.schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + result, err := decoder.Decode([]byte(tt.jsonData)) + + if (err == nil) != tt.expectOK { + t.Errorf("Decode() error = %v, expectOK %v", err, tt.expectOK) + return + } + + if tt.expectOK && result == nil { + t.Error("Expected non-nil result for valid data") + } + }) + } +} + +func TestJSONSchemaDecoder_GetSchemaInfo(t *testing.T) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "User Schema", + "description": "A schema for user objects", + "type": "object", + "properties": { + "id": {"type": "integer"} + } + }` + + decoder, err := NewJSONSchemaDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + info := decoder.GetSchemaInfo() + + if info["title"] != "User Schema" { + t.Errorf("Expected title='User Schema', got %v", info["title"]) + } + + if info["description"] != "A schema for user objects" { + t.Errorf("Expected description='A schema for user objects', got %v", info["description"]) + } + + if info["schema_version"] != "http://json-schema.org/draft-07/schema#" { + t.Errorf("Expected schema_version='http://json-schema.org/draft-07/schema#', got %v", info["schema_version"]) + } + + if info["type"] != "object" { + t.Errorf("Expected type='object', got %v", info["type"]) + } +} + +// Benchmark tests +func BenchmarkJSONSchemaDecoder_Decode(b *testing.B) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"} + } + }` + + decoder, _ := NewJSONSchemaDecoder(schema) + jsonData := []byte(`{"id": 123, "name": "John Doe"}`) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = decoder.Decode(jsonData) + } +} + +func BenchmarkJSONSchemaDecoder_DecodeToRecordValue(b *testing.B) { + schema := `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"} + } + }` + + decoder, _ := NewJSONSchemaDecoder(schema) + jsonData := []byte(`{"id": 123, "name": "John Doe"}`) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = decoder.DecodeToRecordValue(jsonData) + } +} diff --git a/weed/mq/kafka/schema/manager.go b/weed/mq/kafka/schema/manager.go index 7f0c059e9..c7aa773bf 100644 --- a/weed/mq/kafka/schema/manager.go +++ b/weed/mq/kafka/schema/manager.go @@ -16,9 +16,10 @@ type Manager struct { registryClient *RegistryClient // Decoder cache - avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder - protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder - decoderMu sync.RWMutex + avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder + protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder + jsonSchemaDecoders map[uint32]*JSONSchemaDecoder // schema ID -> decoder + decoderMu sync.RWMutex // Configuration config ManagerConfig @@ -73,10 +74,11 @@ func NewManager(config ManagerConfig) (*Manager, error) { registryClient := NewRegistryClient(registryConfig) return &Manager{ - registryClient: registryClient, - avroDecoders: make(map[uint32]*AvroDecoder), - protobufDecoders: make(map[uint32]*ProtobufDecoder), - config: config, + registryClient: registryClient, + avroDecoders: make(map[uint32]*AvroDecoder), + protobufDecoders: make(map[uint32]*ProtobufDecoder), + jsonSchemaDecoders: make(map[uint32]*JSONSchemaDecoder), + config: config, }, nil } @@ -130,7 +132,10 @@ func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) { return nil, fmt.Errorf("failed to decode Protobuf message: %w", err) } case FormatJSONSchema: - return nil, fmt.Errorf("JSON Schema decoding not yet implemented (Phase 6)") + recordValue, recordType, err = m.decodeJSONSchemaMessage(envelope, cachedSchema) + if err != nil { + return nil, fmt.Errorf("failed to decode JSON Schema message: %w", err) + } default: return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format) } @@ -215,6 +220,38 @@ func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchem return recordValue, recordType, nil } +// decodeJSONSchemaMessage decodes a JSON Schema message using cached or new decoder +func (m *Manager) decodeJSONSchemaMessage(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) (*schema_pb.RecordValue, *schema_pb.RecordType, error) { + // Get or create JSON Schema decoder + decoder, err := m.getJSONSchemaDecoder(envelope.SchemaID, cachedSchema.Schema) + if err != nil { + return nil, nil, fmt.Errorf("failed to get JSON Schema decoder: %w", err) + } + + // Decode to RecordValue + recordValue, err := decoder.DecodeToRecordValue(envelope.Payload) + if err != nil { + if m.config.ValidationMode == ValidationStrict { + return nil, nil, fmt.Errorf("strict validation failed: %w", err) + } + // In permissive mode, try to decode as much as possible + return nil, nil, fmt.Errorf("permissive decoding failed: %w", err) + } + + // Get RecordType from schema + recordType, err := decoder.InferRecordType() + if err != nil { + // Fall back to inferring from the decoded map + if decodedMap, decodeErr := decoder.Decode(envelope.Payload); decodeErr == nil { + recordType = InferRecordTypeFromMap(decodedMap) + } else { + return nil, nil, fmt.Errorf("failed to infer record type: %w", err) + } + } + + return recordValue, recordType, nil +} + // getAvroDecoder gets or creates an Avro decoder for the given schema func (m *Manager) getAvroDecoder(schemaID uint32, schemaStr string) (*AvroDecoder, error) { // Check cache first @@ -263,7 +300,31 @@ func (m *Manager) getProtobufDecoder(schemaID uint32, schemaStr string) (*Protob m.decoderMu.Lock() m.protobufDecoders[schemaID] = decoder m.decoderMu.Unlock() + + return decoder, nil +} +// getJSONSchemaDecoder gets or creates a JSON Schema decoder for the given schema +func (m *Manager) getJSONSchemaDecoder(schemaID uint32, schemaStr string) (*JSONSchemaDecoder, error) { + // Check cache first + m.decoderMu.RLock() + if decoder, exists := m.jsonSchemaDecoders[schemaID]; exists { + m.decoderMu.RUnlock() + return decoder, nil + } + m.decoderMu.RUnlock() + + // Create new decoder + decoder, err := NewJSONSchemaDecoder(schemaStr) + if err != nil { + return nil, err + } + + // Cache the decoder + m.decoderMu.Lock() + m.jsonSchemaDecoders[schemaID] = decoder + m.decoderMu.Unlock() + return decoder, nil } @@ -324,15 +385,16 @@ func (m *Manager) ClearCache() { m.decoderMu.Lock() m.avroDecoders = make(map[uint32]*AvroDecoder) m.protobufDecoders = make(map[uint32]*ProtobufDecoder) + m.jsonSchemaDecoders = make(map[uint32]*JSONSchemaDecoder) m.decoderMu.Unlock() - + m.registryClient.ClearCache() } // GetCacheStats returns cache statistics func (m *Manager) GetCacheStats() (decoders, schemas, subjects int) { m.decoderMu.RLock() - decoders = len(m.avroDecoders) + len(m.protobufDecoders) + decoders = len(m.avroDecoders) + len(m.protobufDecoders) + len(m.jsonSchemaDecoders) m.decoderMu.RUnlock() schemas, subjects = m.registryClient.GetCacheStats() @@ -347,7 +409,7 @@ func (m *Manager) EncodeMessage(recordValue *schema_pb.RecordValue, schemaID uin case FormatProtobuf: return m.encodeProtobufMessage(recordValue, schemaID) case FormatJSONSchema: - return nil, fmt.Errorf("JSON Schema encoding not yet implemented (Phase 7)") + return m.encodeJSONSchemaMessage(recordValue, schemaID) default: return nil, fmt.Errorf("unsupported format for encoding: %v", format) } @@ -413,7 +475,33 @@ func (m *Manager) encodeProtobufMessage(recordValue *schema_pb.RecordValue, sche // Create Confluent envelope (with indexes if needed) envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, nil, binary) + + return envelope, nil +} +// encodeJSONSchemaMessage encodes a RecordValue back to JSON Schema format +func (m *Manager) encodeJSONSchemaMessage(recordValue *schema_pb.RecordValue, schemaID uint32) ([]byte, error) { + // Get schema from registry + cachedSchema, err := m.registryClient.GetSchemaByID(schemaID) + if err != nil { + return nil, fmt.Errorf("failed to get schema for encoding: %w", err) + } + + // Get decoder (which contains the schema validator) + decoder, err := m.getJSONSchemaDecoder(schemaID, cachedSchema.Schema) + if err != nil { + return nil, fmt.Errorf("failed to get decoder for encoding: %w", err) + } + + // Encode using JSON Schema decoder + jsonData, err := decoder.EncodeFromRecordValue(recordValue) + if err != nil { + return nil, fmt.Errorf("failed to encode to JSON: %w", err) + } + + // Create Confluent envelope + envelope := CreateConfluentEnvelope(FormatJSONSchema, schemaID, nil, jsonData) + return envelope, nil }