You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
387 lines
10 KiB
387 lines
10 KiB
package schema
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/xeipuuv/gojsonschema"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// JSONSchemaDecoder handles JSON Schema validation and conversion to SeaweedMQ format
|
|
type JSONSchemaDecoder struct {
|
|
schema *gojsonschema.Schema
|
|
schemaDoc map[string]interface{} // Parsed schema document for type inference
|
|
schemaJSON string // Original schema JSON
|
|
}
|
|
|
|
// NewJSONSchemaDecoder creates a new JSON Schema decoder from a schema string
|
|
func NewJSONSchemaDecoder(schemaJSON string) (*JSONSchemaDecoder, error) {
|
|
// Parse the schema JSON
|
|
var schemaDoc map[string]interface{}
|
|
if err := json.Unmarshal([]byte(schemaJSON), &schemaDoc); err != nil {
|
|
return nil, fmt.Errorf("failed to parse JSON schema: %w", err)
|
|
}
|
|
|
|
// Create JSON Schema validator
|
|
schemaLoader := gojsonschema.NewStringLoader(schemaJSON)
|
|
schema, err := gojsonschema.NewSchema(schemaLoader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create JSON schema validator: %w", err)
|
|
}
|
|
|
|
return &JSONSchemaDecoder{
|
|
schema: schema,
|
|
schemaDoc: schemaDoc,
|
|
schemaJSON: schemaJSON,
|
|
}, nil
|
|
}
|
|
|
|
// Decode decodes and validates JSON data against the schema, returning a Go map
|
|
func (jsd *JSONSchemaDecoder) Decode(data []byte) (map[string]interface{}, error) {
|
|
// Parse JSON data
|
|
var jsonData interface{}
|
|
if err := json.Unmarshal(data, &jsonData); err != nil {
|
|
return nil, fmt.Errorf("failed to parse JSON data: %w", err)
|
|
}
|
|
|
|
// Validate against schema
|
|
documentLoader := gojsonschema.NewGoLoader(jsonData)
|
|
result, err := jsd.schema.Validate(documentLoader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to validate JSON data: %w", err)
|
|
}
|
|
|
|
if !result.Valid() {
|
|
// Collect validation errors
|
|
var errorMsgs []string
|
|
for _, desc := range result.Errors() {
|
|
errorMsgs = append(errorMsgs, desc.String())
|
|
}
|
|
return nil, fmt.Errorf("JSON data validation failed: %v", errorMsgs)
|
|
}
|
|
|
|
// Convert to map[string]interface{} for consistency
|
|
switch v := jsonData.(type) {
|
|
case map[string]interface{}:
|
|
return v, nil
|
|
case []interface{}:
|
|
// Handle array at root level by wrapping in a map
|
|
return map[string]interface{}{"items": v}, nil
|
|
default:
|
|
// Handle primitive values at root level
|
|
return map[string]interface{}{"value": v}, nil
|
|
}
|
|
}
|
|
|
|
// DecodeToRecordValue decodes JSON data directly to SeaweedMQ RecordValue
|
|
func (jsd *JSONSchemaDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
|
|
jsonMap, err := jsd.Decode(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return MapToRecordValue(jsonMap), nil
|
|
}
|
|
|
|
// InferRecordType infers a SeaweedMQ RecordType from the JSON Schema
|
|
func (jsd *JSONSchemaDecoder) InferRecordType() (*schema_pb.RecordType, error) {
|
|
return jsd.jsonSchemaToRecordType(jsd.schemaDoc), nil
|
|
}
|
|
|
|
// ValidateOnly validates JSON data against the schema without decoding
|
|
func (jsd *JSONSchemaDecoder) ValidateOnly(data []byte) error {
|
|
_, err := jsd.Decode(data)
|
|
return err
|
|
}
|
|
|
|
// jsonSchemaToRecordType converts a JSON Schema to SeaweedMQ RecordType
|
|
func (jsd *JSONSchemaDecoder) jsonSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
|
|
if schemaType == "object" {
|
|
return jsd.objectSchemaToRecordType(schemaDoc)
|
|
}
|
|
|
|
// For non-object schemas, create a wrapper record
|
|
return &schema_pb.RecordType{
|
|
Fields: []*schema_pb.Field{
|
|
{
|
|
Name: "value",
|
|
FieldIndex: 0,
|
|
Type: jsd.jsonSchemaTypeToType(schemaDoc),
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// objectSchemaToRecordType converts an object JSON Schema to RecordType
|
|
func (jsd *JSONSchemaDecoder) objectSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
|
|
properties, _ := schemaDoc["properties"].(map[string]interface{})
|
|
required, _ := schemaDoc["required"].([]interface{})
|
|
|
|
// Create set of required fields for quick lookup
|
|
requiredFields := make(map[string]bool)
|
|
for _, req := range required {
|
|
if reqStr, ok := req.(string); ok {
|
|
requiredFields[reqStr] = true
|
|
}
|
|
}
|
|
|
|
fields := make([]*schema_pb.Field, 0, len(properties))
|
|
fieldIndex := int32(0)
|
|
|
|
for fieldName, fieldSchema := range properties {
|
|
fieldSchemaMap, ok := fieldSchema.(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
field := &schema_pb.Field{
|
|
Name: fieldName,
|
|
FieldIndex: fieldIndex,
|
|
Type: jsd.jsonSchemaTypeToType(fieldSchemaMap),
|
|
IsRequired: requiredFields[fieldName],
|
|
IsRepeated: jsd.isArrayType(fieldSchemaMap),
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
fieldIndex++
|
|
}
|
|
|
|
return &schema_pb.RecordType{
|
|
Fields: fields,
|
|
}
|
|
}
|
|
|
|
// jsonSchemaTypeToType converts a JSON Schema type to SeaweedMQ Type
|
|
func (jsd *JSONSchemaDecoder) jsonSchemaTypeToType(schemaDoc map[string]interface{}) *schema_pb.Type {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
|
|
switch schemaType {
|
|
case "boolean":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BOOL,
|
|
},
|
|
}
|
|
case "integer":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "int32":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}
|
|
}
|
|
case "number":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "float":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_FLOAT,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_DOUBLE,
|
|
},
|
|
}
|
|
}
|
|
case "string":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "date-time":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_TIMESTAMP,
|
|
},
|
|
}
|
|
case "byte", "binary":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
case "array":
|
|
items, _ := schemaDoc["items"].(map[string]interface{})
|
|
elementType := jsd.jsonSchemaTypeToType(items)
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ListType{
|
|
ListType: &schema_pb.ListType{
|
|
ElementType: elementType,
|
|
},
|
|
},
|
|
}
|
|
case "object":
|
|
nestedRecordType := jsd.objectSchemaToRecordType(schemaDoc)
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_RecordType{
|
|
RecordType: nestedRecordType,
|
|
},
|
|
}
|
|
default:
|
|
// Handle union types (oneOf, anyOf, allOf)
|
|
if oneOf, exists := schemaDoc["oneOf"].([]interface{}); exists && len(oneOf) > 0 {
|
|
// For unions, use the first type as default
|
|
if firstType, ok := oneOf[0].(map[string]interface{}); ok {
|
|
return jsd.jsonSchemaTypeToType(firstType)
|
|
}
|
|
}
|
|
|
|
// Default to string for unknown types
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// isArrayType checks if a JSON Schema represents an array type
|
|
func (jsd *JSONSchemaDecoder) isArrayType(schemaDoc map[string]interface{}) bool {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
return schemaType == "array"
|
|
}
|
|
|
|
// EncodeFromRecordValue encodes a RecordValue back to JSON format
|
|
func (jsd *JSONSchemaDecoder) EncodeFromRecordValue(recordValue *schema_pb.RecordValue) ([]byte, error) {
|
|
// Convert RecordValue back to Go map
|
|
goMap := recordValueToMap(recordValue)
|
|
|
|
// Encode to JSON
|
|
jsonData, err := json.Marshal(goMap)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
|
|
}
|
|
|
|
// Validate the generated JSON against the schema
|
|
if err := jsd.ValidateOnly(jsonData); err != nil {
|
|
return nil, fmt.Errorf("generated JSON failed schema validation: %w", err)
|
|
}
|
|
|
|
return jsonData, nil
|
|
}
|
|
|
|
// GetSchemaInfo returns information about the JSON Schema
|
|
func (jsd *JSONSchemaDecoder) GetSchemaInfo() map[string]interface{} {
|
|
info := make(map[string]interface{})
|
|
|
|
if title, exists := jsd.schemaDoc["title"]; exists {
|
|
info["title"] = title
|
|
}
|
|
|
|
if description, exists := jsd.schemaDoc["description"]; exists {
|
|
info["description"] = description
|
|
}
|
|
|
|
if schemaVersion, exists := jsd.schemaDoc["$schema"]; exists {
|
|
info["schema_version"] = schemaVersion
|
|
}
|
|
|
|
if schemaType, exists := jsd.schemaDoc["type"]; exists {
|
|
info["type"] = schemaType
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// Enhanced JSON value conversion with better type handling
|
|
func (jsd *JSONSchemaDecoder) convertJSONValue(value interface{}, expectedType string) interface{} {
|
|
if value == nil {
|
|
return nil
|
|
}
|
|
|
|
switch expectedType {
|
|
case "integer":
|
|
switch v := value.(type) {
|
|
case float64:
|
|
return int64(v)
|
|
case string:
|
|
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
case "number":
|
|
switch v := value.(type) {
|
|
case string:
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
return f
|
|
}
|
|
}
|
|
case "boolean":
|
|
switch v := value.(type) {
|
|
case string:
|
|
if b, err := strconv.ParseBool(v); err == nil {
|
|
return b
|
|
}
|
|
}
|
|
case "string":
|
|
// Handle date-time format conversion
|
|
if str, ok := value.(string); ok {
|
|
// Try to parse as RFC3339 timestamp
|
|
if t, err := time.Parse(time.RFC3339, str); err == nil {
|
|
return t
|
|
}
|
|
}
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
// ValidateAndNormalize validates JSON data and normalizes types according to schema
|
|
func (jsd *JSONSchemaDecoder) ValidateAndNormalize(data []byte) ([]byte, error) {
|
|
// First decode normally
|
|
jsonMap, err := jsd.Decode(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Normalize types based on schema
|
|
normalized := jsd.normalizeMapTypes(jsonMap, jsd.schemaDoc)
|
|
|
|
// Re-encode with normalized types
|
|
return json.Marshal(normalized)
|
|
}
|
|
|
|
// normalizeMapTypes normalizes map values according to JSON Schema types
|
|
func (jsd *JSONSchemaDecoder) normalizeMapTypes(data map[string]interface{}, schemaDoc map[string]interface{}) map[string]interface{} {
|
|
properties, _ := schemaDoc["properties"].(map[string]interface{})
|
|
result := make(map[string]interface{})
|
|
|
|
for key, value := range data {
|
|
if fieldSchema, exists := properties[key]; exists {
|
|
if fieldSchemaMap, ok := fieldSchema.(map[string]interface{}); ok {
|
|
fieldType, _ := fieldSchemaMap["type"].(string)
|
|
result[key] = jsd.convertJSONValue(value, fieldType)
|
|
continue
|
|
}
|
|
}
|
|
result[key] = value
|
|
}
|
|
|
|
return result
|
|
}
|