You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

719 lines
20 KiB

package schema
import (
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/linkedin/goavro/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// AvroDecoder handles Avro schema decoding and conversion to SeaweedMQ format
type AvroDecoder struct {
codec *goavro.Codec
}
// NewAvroDecoder creates a new Avro decoder from a schema string
func NewAvroDecoder(schemaStr string) (*AvroDecoder, error) {
codec, err := goavro.NewCodec(schemaStr)
if err != nil {
return nil, fmt.Errorf("failed to create Avro codec: %w", err)
}
return &AvroDecoder{
codec: codec,
}, nil
}
// Decode decodes Avro binary data to a Go map
func (ad *AvroDecoder) Decode(data []byte) (map[string]interface{}, error) {
native, _, err := ad.codec.NativeFromBinary(data)
if err != nil {
return nil, fmt.Errorf("failed to decode Avro data: %w", err)
}
// Convert to map[string]interface{} for easier processing
result, ok := native.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected Avro record, got %T", native)
}
return result, nil
}
// DecodeToRecordValue decodes Avro data directly to SeaweedMQ RecordValue
func (ad *AvroDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
nativeMap, err := ad.Decode(data)
if err != nil {
return nil, err
}
return MapToRecordValue(nativeMap), nil
}
// InferRecordType infers a SeaweedMQ RecordType from an Avro schema
func (ad *AvroDecoder) InferRecordType() (*schema_pb.RecordType, error) {
schema := ad.codec.Schema()
return avroSchemaToRecordType(schema)
}
// MapToRecordValue converts a Go map to SeaweedMQ RecordValue
func MapToRecordValue(m map[string]interface{}) *schema_pb.RecordValue {
fields := make(map[string]*schema_pb.Value)
for key, value := range m {
fields[key] = goValueToSchemaValue(value)
}
return &schema_pb.RecordValue{
Fields: fields,
}
}
// goValueToSchemaValue converts a Go value to a SeaweedMQ Value
func goValueToSchemaValue(value interface{}) *schema_pb.Value {
if value == nil {
// For null values, use an empty string as default
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: ""},
}
}
switch v := value.(type) {
case bool:
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: v},
}
case int32:
return &schema_pb.Value{
Kind: &schema_pb.Value_Int32Value{Int32Value: v},
}
case int64:
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: v},
}
case int:
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)},
}
case float32:
return &schema_pb.Value{
Kind: &schema_pb.Value_FloatValue{FloatValue: v},
}
case float64:
return &schema_pb.Value{
Kind: &schema_pb.Value_DoubleValue{DoubleValue: v},
}
case string:
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: v},
}
case []byte:
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: v},
}
case time.Time:
return &schema_pb.Value{
Kind: &schema_pb.Value_TimestampValue{
TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: v.UnixMicro(),
IsUtc: true,
},
},
}
case []interface{}:
// Handle arrays
listValues := make([]*schema_pb.Value, len(v))
for i, item := range v {
listValues[i] = goValueToSchemaValue(item)
}
return &schema_pb.Value{
Kind: &schema_pb.Value_ListValue{
ListValue: &schema_pb.ListValue{
Values: listValues,
},
},
}
case map[string]interface{}:
// Check if this is an Avro union type (single key-value pair with type name as key)
// Union types have keys that are typically Avro type names like "int", "string", etc.
// Regular nested records would have meaningful field names like "inner", "name", etc.
if len(v) == 1 {
for unionType, unionValue := range v {
// Handle common Avro union type patterns (only if key looks like a type name)
switch unionType {
case "int":
if intVal, ok := unionValue.(int32); ok {
// Store union as a record with the union type as field name
// This preserves the union information for re-encoding
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"int": {
Kind: &schema_pb.Value_Int32Value{Int32Value: intVal},
},
},
},
},
}
}
case "long":
if longVal, ok := unionValue.(int64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"long": {
Kind: &schema_pb.Value_Int64Value{Int64Value: longVal},
},
},
},
},
}
}
case "float":
if floatVal, ok := unionValue.(float32); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"float": {
Kind: &schema_pb.Value_FloatValue{FloatValue: floatVal},
},
},
},
},
}
}
case "double":
if doubleVal, ok := unionValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"double": {
Kind: &schema_pb.Value_DoubleValue{DoubleValue: doubleVal},
},
},
},
},
}
}
case "string":
if strVal, ok := unionValue.(string); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"string": {
Kind: &schema_pb.Value_StringValue{StringValue: strVal},
},
},
},
},
}
}
case "boolean":
if boolVal, ok := unionValue.(bool); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"boolean": {
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
},
},
},
},
}
}
}
// If it's not a recognized union type, fall through to treat as nested record
}
}
// Handle nested records (both single-field and multi-field maps)
fields := make(map[string]*schema_pb.Value)
for key, val := range v {
fields[key] = goValueToSchemaValue(val)
}
return &schema_pb.Value{
Kind: &schema_pb.Value_RecordValue{
RecordValue: &schema_pb.RecordValue{
Fields: fields,
},
},
}
default:
// Handle other types by converting to string
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{
StringValue: fmt.Sprintf("%v", v),
},
}
}
}
// avroSchemaToRecordType converts an Avro schema to SeaweedMQ RecordType
func avroSchemaToRecordType(schemaStr string) (*schema_pb.RecordType, error) {
// Validate the Avro schema by creating a codec (this ensures it's valid)
_, err := goavro.NewCodec(schemaStr)
if err != nil {
return nil, fmt.Errorf("failed to parse Avro schema: %w", err)
}
// Parse the schema JSON to extract field definitions
var avroSchema map[string]interface{}
if err := json.Unmarshal([]byte(schemaStr), &avroSchema); err != nil {
return nil, fmt.Errorf("failed to parse Avro schema JSON: %w", err)
}
// Extract fields from the Avro schema
fields, err := extractAvroFields(avroSchema)
if err != nil {
return nil, fmt.Errorf("failed to extract Avro fields: %w", err)
}
return &schema_pb.RecordType{
Fields: fields,
}, nil
}
// extractAvroFields extracts field definitions from parsed Avro schema JSON
func extractAvroFields(avroSchema map[string]interface{}) ([]*schema_pb.Field, error) {
// Check if this is a record type
schemaType, ok := avroSchema["type"].(string)
if !ok || schemaType != "record" {
return nil, fmt.Errorf("expected record type, got %v", schemaType)
}
// Extract fields array
fieldsInterface, ok := avroSchema["fields"]
if !ok {
return nil, fmt.Errorf("no fields found in Avro record schema")
}
fieldsArray, ok := fieldsInterface.([]interface{})
if !ok {
return nil, fmt.Errorf("fields must be an array")
}
// Convert each Avro field to SeaweedMQ field
fields := make([]*schema_pb.Field, 0, len(fieldsArray))
for i, fieldInterface := range fieldsArray {
fieldMap, ok := fieldInterface.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("field %d is not a valid object", i)
}
field, err := convertAvroFieldToSeaweedMQ(fieldMap, int32(i))
if err != nil {
return nil, fmt.Errorf("failed to convert field %d: %w", i, err)
}
fields = append(fields, field)
}
return fields, nil
}
// convertAvroFieldToSeaweedMQ converts a single Avro field to SeaweedMQ Field
func convertAvroFieldToSeaweedMQ(avroField map[string]interface{}, fieldIndex int32) (*schema_pb.Field, error) {
// Extract field name
name, ok := avroField["name"].(string)
if !ok {
return nil, fmt.Errorf("field name is required")
}
// Extract field type and check if it's an array
fieldType, isRepeated, err := convertAvroTypeToSeaweedMQWithRepeated(avroField["type"])
if err != nil {
return nil, fmt.Errorf("failed to convert field type for %s: %w", name, err)
}
// Check if field has a default value (indicates it's optional)
_, hasDefault := avroField["default"]
isRequired := !hasDefault
return &schema_pb.Field{
Name: name,
FieldIndex: fieldIndex,
Type: fieldType,
IsRequired: isRequired,
IsRepeated: isRepeated,
}, nil
}
// convertAvroTypeToSeaweedMQ converts Avro type to SeaweedMQ Type
func convertAvroTypeToSeaweedMQ(avroType interface{}) (*schema_pb.Type, error) {
fieldType, _, err := convertAvroTypeToSeaweedMQWithRepeated(avroType)
return fieldType, err
}
// convertAvroTypeToSeaweedMQWithRepeated converts Avro type to SeaweedMQ Type and returns if it's repeated
func convertAvroTypeToSeaweedMQWithRepeated(avroType interface{}) (*schema_pb.Type, bool, error) {
switch t := avroType.(type) {
case string:
// Simple type
fieldType, err := convertAvroSimpleType(t)
return fieldType, false, err
case map[string]interface{}:
// Complex type (record, enum, array, map, fixed)
return convertAvroComplexTypeWithRepeated(t)
case []interface{}:
// Union type
fieldType, err := convertAvroUnionType(t)
return fieldType, false, err
default:
return nil, false, fmt.Errorf("unsupported Avro type: %T", avroType)
}
}
// convertAvroSimpleType converts simple Avro types to SeaweedMQ types
func convertAvroSimpleType(avroType string) (*schema_pb.Type, error) {
switch avroType {
case "null":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES, // Use bytes for null
},
}, nil
case "boolean":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BOOL,
},
}, nil
case "int":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}, nil
case "long":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}, nil
case "float":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_FLOAT,
},
}, nil
case "double":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_DOUBLE,
},
}, nil
case "bytes":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}, nil
case "string":
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}, nil
default:
return nil, fmt.Errorf("unsupported simple Avro type: %s", avroType)
}
}
// convertAvroComplexType converts complex Avro types to SeaweedMQ types
func convertAvroComplexType(avroType map[string]interface{}) (*schema_pb.Type, error) {
fieldType, _, err := convertAvroComplexTypeWithRepeated(avroType)
return fieldType, err
}
// convertAvroComplexTypeWithRepeated converts complex Avro types to SeaweedMQ types and returns if it's repeated
func convertAvroComplexTypeWithRepeated(avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
typeStr, ok := avroType["type"].(string)
if !ok {
return nil, false, fmt.Errorf("complex type must have a type field")
}
// Handle logical types - they are based on underlying primitive types
if _, hasLogicalType := avroType["logicalType"]; hasLogicalType {
// For logical types, use the underlying primitive type
return convertAvroSimpleTypeWithLogical(typeStr, avroType)
}
switch typeStr {
case "record":
// Nested record type
fields, err := extractAvroFields(avroType)
if err != nil {
return nil, false, fmt.Errorf("failed to extract nested record fields: %w", err)
}
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: &schema_pb.RecordType{
Fields: fields,
},
},
}, false, nil
case "enum":
// Enum type - treat as string for now
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}, false, nil
case "array":
// Array type
itemsType, err := convertAvroTypeToSeaweedMQ(avroType["items"])
if err != nil {
return nil, false, fmt.Errorf("failed to convert array items type: %w", err)
}
// For arrays, we return the item type and set IsRepeated=true
return itemsType, true, nil
case "map":
// Map type - treat as record with dynamic fields
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: &schema_pb.RecordType{
Fields: []*schema_pb.Field{}, // Dynamic fields
},
},
}, false, nil
case "fixed":
// Fixed-length bytes
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}, false, nil
default:
return nil, false, fmt.Errorf("unsupported complex Avro type: %s", typeStr)
}
}
// convertAvroSimpleTypeWithLogical handles logical types based on their underlying primitive types
func convertAvroSimpleTypeWithLogical(primitiveType string, avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
logicalType, _ := avroType["logicalType"].(string)
// Map logical types to appropriate SeaweedMQ types
switch logicalType {
case "decimal":
// Decimal logical type - use bytes for precision
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}, false, nil
case "uuid":
// UUID logical type - use string
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}, false, nil
case "date":
// Date logical type (int) - use int32
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}, false, nil
case "time-millis":
// Time in milliseconds (int) - use int32
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}, false, nil
case "time-micros":
// Time in microseconds (long) - use int64
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}, false, nil
case "timestamp-millis":
// Timestamp in milliseconds (long) - use int64
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}, false, nil
case "timestamp-micros":
// Timestamp in microseconds (long) - use int64
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}, false, nil
default:
// For unknown logical types, fall back to the underlying primitive type
fieldType, err := convertAvroSimpleType(primitiveType)
return fieldType, false, err
}
}
// convertAvroUnionType converts Avro union types to SeaweedMQ types
func convertAvroUnionType(unionTypes []interface{}) (*schema_pb.Type, error) {
// For unions, we'll use the first non-null type
// This is a simplification - in a full implementation, we might want to create a union type
for _, unionType := range unionTypes {
if typeStr, ok := unionType.(string); ok && typeStr == "null" {
continue // Skip null types
}
// Use the first non-null type
return convertAvroTypeToSeaweedMQ(unionType)
}
// If all types are null, return bytes type
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}, nil
}
// InferRecordTypeFromMap infers a RecordType from a decoded map
// This is useful when we don't have the original Avro schema
func InferRecordTypeFromMap(m map[string]interface{}) *schema_pb.RecordType {
fields := make([]*schema_pb.Field, 0, len(m))
fieldIndex := int32(0)
for key, value := range m {
fieldType := inferTypeFromValue(value)
field := &schema_pb.Field{
Name: key,
FieldIndex: fieldIndex,
Type: fieldType,
IsRequired: value != nil, // Non-nil values are considered required
IsRepeated: false,
}
// Check if it's an array
if reflect.TypeOf(value).Kind() == reflect.Slice {
field.IsRepeated = true
}
fields = append(fields, field)
fieldIndex++
}
return &schema_pb.RecordType{
Fields: fields,
}
}
// inferTypeFromValue infers a SeaweedMQ Type from a Go value
func inferTypeFromValue(value interface{}) *schema_pb.Type {
if value == nil {
// Default to string for null values
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
switch v := value.(type) {
case bool:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BOOL,
},
}
case int32:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}
case int64, int:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}
case float32:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_FLOAT,
},
}
case float64:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_DOUBLE,
},
}
case string:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
case []byte:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}
case time.Time:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_TIMESTAMP,
},
}
case []interface{}:
// Handle arrays - infer element type from first element
var elementType *schema_pb.Type
if len(v) > 0 {
elementType = inferTypeFromValue(v[0])
} else {
// Default to string for empty arrays
elementType = &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
return &schema_pb.Type{
Kind: &schema_pb.Type_ListType{
ListType: &schema_pb.ListType{
ElementType: elementType,
},
},
}
case map[string]interface{}:
// Handle nested records
nestedRecordType := InferRecordTypeFromMap(v)
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: nestedRecordType,
},
}
default:
// Default to string for unknown types
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
}