You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

387 lines
10 KiB

package schema
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/xeipuuv/gojsonschema"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// JSONSchemaDecoder handles JSON Schema validation and conversion to SeaweedMQ format
type JSONSchemaDecoder struct {
schema *gojsonschema.Schema
schemaDoc map[string]interface{} // Parsed schema document for type inference
schemaJSON string // Original schema JSON
}
// NewJSONSchemaDecoder creates a new JSON Schema decoder from a schema string
func NewJSONSchemaDecoder(schemaJSON string) (*JSONSchemaDecoder, error) {
// Parse the schema JSON
var schemaDoc map[string]interface{}
if err := json.Unmarshal([]byte(schemaJSON), &schemaDoc); err != nil {
return nil, fmt.Errorf("failed to parse JSON schema: %w", err)
}
// Create JSON Schema validator
schemaLoader := gojsonschema.NewStringLoader(schemaJSON)
schema, err := gojsonschema.NewSchema(schemaLoader)
if err != nil {
return nil, fmt.Errorf("failed to create JSON schema validator: %w", err)
}
return &JSONSchemaDecoder{
schema: schema,
schemaDoc: schemaDoc,
schemaJSON: schemaJSON,
}, nil
}
// Decode decodes and validates JSON data against the schema, returning a Go map
func (jsd *JSONSchemaDecoder) Decode(data []byte) (map[string]interface{}, error) {
// Parse JSON data
var jsonData interface{}
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, fmt.Errorf("failed to parse JSON data: %w", err)
}
// Validate against schema
documentLoader := gojsonschema.NewGoLoader(jsonData)
result, err := jsd.schema.Validate(documentLoader)
if err != nil {
return nil, fmt.Errorf("failed to validate JSON data: %w", err)
}
if !result.Valid() {
// Collect validation errors
var errorMsgs []string
for _, desc := range result.Errors() {
errorMsgs = append(errorMsgs, desc.String())
}
return nil, fmt.Errorf("JSON data validation failed: %v", errorMsgs)
}
// Convert to map[string]interface{} for consistency
switch v := jsonData.(type) {
case map[string]interface{}:
return v, nil
case []interface{}:
// Handle array at root level by wrapping in a map
return map[string]interface{}{"items": v}, nil
default:
// Handle primitive values at root level
return map[string]interface{}{"value": v}, nil
}
}
// DecodeToRecordValue decodes JSON data directly to SeaweedMQ RecordValue
func (jsd *JSONSchemaDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
jsonMap, err := jsd.Decode(data)
if err != nil {
return nil, err
}
return MapToRecordValue(jsonMap), nil
}
// InferRecordType infers a SeaweedMQ RecordType from the JSON Schema
func (jsd *JSONSchemaDecoder) InferRecordType() (*schema_pb.RecordType, error) {
return jsd.jsonSchemaToRecordType(jsd.schemaDoc), nil
}
// ValidateOnly validates JSON data against the schema without decoding
func (jsd *JSONSchemaDecoder) ValidateOnly(data []byte) error {
_, err := jsd.Decode(data)
return err
}
// jsonSchemaToRecordType converts a JSON Schema to SeaweedMQ RecordType
func (jsd *JSONSchemaDecoder) jsonSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
schemaType, _ := schemaDoc["type"].(string)
if schemaType == "object" {
return jsd.objectSchemaToRecordType(schemaDoc)
}
// For non-object schemas, create a wrapper record
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "value",
FieldIndex: 0,
Type: jsd.jsonSchemaTypeToType(schemaDoc),
IsRequired: true,
IsRepeated: false,
},
},
}
}
// objectSchemaToRecordType converts an object JSON Schema to RecordType
func (jsd *JSONSchemaDecoder) objectSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
properties, _ := schemaDoc["properties"].(map[string]interface{})
required, _ := schemaDoc["required"].([]interface{})
// Create set of required fields for quick lookup
requiredFields := make(map[string]bool)
for _, req := range required {
if reqStr, ok := req.(string); ok {
requiredFields[reqStr] = true
}
}
fields := make([]*schema_pb.Field, 0, len(properties))
fieldIndex := int32(0)
for fieldName, fieldSchema := range properties {
fieldSchemaMap, ok := fieldSchema.(map[string]interface{})
if !ok {
continue
}
field := &schema_pb.Field{
Name: fieldName,
FieldIndex: fieldIndex,
Type: jsd.jsonSchemaTypeToType(fieldSchemaMap),
IsRequired: requiredFields[fieldName],
IsRepeated: jsd.isArrayType(fieldSchemaMap),
}
fields = append(fields, field)
fieldIndex++
}
return &schema_pb.RecordType{
Fields: fields,
}
}
// jsonSchemaTypeToType converts a JSON Schema type to SeaweedMQ Type
func (jsd *JSONSchemaDecoder) jsonSchemaTypeToType(schemaDoc map[string]interface{}) *schema_pb.Type {
schemaType, _ := schemaDoc["type"].(string)
switch schemaType {
case "boolean":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BOOL,
},
}
case "integer":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "int32":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}
}
case "number":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "float":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_FLOAT,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_DOUBLE,
},
}
}
case "string":
// Check for format hints
format, _ := schemaDoc["format"].(string)
switch format {
case "date-time":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_TIMESTAMP,
},
}
case "byte", "binary":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}
default:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
case "array":
items, _ := schemaDoc["items"].(map[string]interface{})
elementType := jsd.jsonSchemaTypeToType(items)
return &schema_pb.Type{
Kind: &schema_pb.Type_ListType{
ListType: &schema_pb.ListType{
ElementType: elementType,
},
},
}
case "object":
nestedRecordType := jsd.objectSchemaToRecordType(schemaDoc)
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: nestedRecordType,
},
}
default:
// Handle union types (oneOf, anyOf, allOf)
if oneOf, exists := schemaDoc["oneOf"].([]interface{}); exists && len(oneOf) > 0 {
// For unions, use the first type as default
if firstType, ok := oneOf[0].(map[string]interface{}); ok {
return jsd.jsonSchemaTypeToType(firstType)
}
}
// Default to string for unknown types
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
}
// isArrayType checks if a JSON Schema represents an array type
func (jsd *JSONSchemaDecoder) isArrayType(schemaDoc map[string]interface{}) bool {
schemaType, _ := schemaDoc["type"].(string)
return schemaType == "array"
}
// EncodeFromRecordValue encodes a RecordValue back to JSON format
func (jsd *JSONSchemaDecoder) EncodeFromRecordValue(recordValue *schema_pb.RecordValue) ([]byte, error) {
// Convert RecordValue back to Go map
goMap := recordValueToMap(recordValue)
// Encode to JSON
jsonData, err := json.Marshal(goMap)
if err != nil {
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
}
// Validate the generated JSON against the schema
if err := jsd.ValidateOnly(jsonData); err != nil {
return nil, fmt.Errorf("generated JSON failed schema validation: %w", err)
}
return jsonData, nil
}
// GetSchemaInfo returns information about the JSON Schema
func (jsd *JSONSchemaDecoder) GetSchemaInfo() map[string]interface{} {
info := make(map[string]interface{})
if title, exists := jsd.schemaDoc["title"]; exists {
info["title"] = title
}
if description, exists := jsd.schemaDoc["description"]; exists {
info["description"] = description
}
if schemaVersion, exists := jsd.schemaDoc["$schema"]; exists {
info["schema_version"] = schemaVersion
}
if schemaType, exists := jsd.schemaDoc["type"]; exists {
info["type"] = schemaType
}
return info
}
// Enhanced JSON value conversion with better type handling
func (jsd *JSONSchemaDecoder) convertJSONValue(value interface{}, expectedType string) interface{} {
if value == nil {
return nil
}
switch expectedType {
case "integer":
switch v := value.(type) {
case float64:
return int64(v)
case string:
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
return i
}
}
case "number":
switch v := value.(type) {
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
return f
}
}
case "boolean":
switch v := value.(type) {
case string:
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
case "string":
// Handle date-time format conversion
if str, ok := value.(string); ok {
// Try to parse as RFC3339 timestamp
if t, err := time.Parse(time.RFC3339, str); err == nil {
return t
}
}
}
return value
}
// ValidateAndNormalize validates JSON data and normalizes types according to schema
func (jsd *JSONSchemaDecoder) ValidateAndNormalize(data []byte) ([]byte, error) {
// First decode normally
jsonMap, err := jsd.Decode(data)
if err != nil {
return nil, err
}
// Normalize types based on schema
normalized := jsd.normalizeMapTypes(jsonMap, jsd.schemaDoc)
// Re-encode with normalized types
return json.Marshal(normalized)
}
// normalizeMapTypes normalizes map values according to JSON Schema types
func (jsd *JSONSchemaDecoder) normalizeMapTypes(data map[string]interface{}, schemaDoc map[string]interface{}) map[string]interface{} {
properties, _ := schemaDoc["properties"].(map[string]interface{})
result := make(map[string]interface{})
for key, value := range data {
if fieldSchema, exists := properties[key]; exists {
if fieldSchemaMap, ok := fieldSchema.(map[string]interface{}); ok {
fieldType, _ := fieldSchemaMap["type"].(string)
result[key] = jsd.convertJSONValue(value, fieldType)
continue
}
}
result[key] = value
}
return result
}