You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							117 lines
						
					
					
						
							4.1 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							117 lines
						
					
					
						
							4.1 KiB
						
					
					
				
								package schema
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"fmt"
							 | 
						|
								
							 | 
						|
									parquet "github.com/parquet-go/parquet-go"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
							 | 
						|
									rootNode, err := toParquetFieldTypeRecord(recordType)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to convert record type to parquet schema: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Fields are sorted by name, so the value should be sorted also
							 | 
						|
									// the sorting is inside parquet.`func (g Group) Fields() []Field`
							 | 
						|
									return parquet.NewSchema(topicName, rootNode), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
							 | 
						|
									// This is the old function - now defaults to Optional for backward compatibility
							 | 
						|
									return toParquetFieldTypeWithRequirement(fieldType, false)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
							 | 
						|
									elementType, err := toParquetFieldType(listType.ElementType)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									return parquet.Repeated(elementType), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
							 | 
						|
									switch scalarType {
							 | 
						|
									case schema_pb.ScalarType_BOOL:
							 | 
						|
										return parquet.Leaf(parquet.BooleanType), nil
							 | 
						|
									case schema_pb.ScalarType_INT32:
							 | 
						|
										return parquet.Leaf(parquet.Int32Type), nil
							 | 
						|
									case schema_pb.ScalarType_INT64:
							 | 
						|
										return parquet.Leaf(parquet.Int64Type), nil
							 | 
						|
									case schema_pb.ScalarType_FLOAT:
							 | 
						|
										return parquet.Leaf(parquet.FloatType), nil
							 | 
						|
									case schema_pb.ScalarType_DOUBLE:
							 | 
						|
										return parquet.Leaf(parquet.DoubleType), nil
							 | 
						|
									case schema_pb.ScalarType_BYTES:
							 | 
						|
										return parquet.Leaf(parquet.ByteArrayType), nil
							 | 
						|
									case schema_pb.ScalarType_STRING:
							 | 
						|
										return parquet.Leaf(parquet.ByteArrayType), nil
							 | 
						|
									// Parquet logical types - map to their physical storage types
							 | 
						|
									case schema_pb.ScalarType_TIMESTAMP:
							 | 
						|
										// Stored as INT64 (microseconds since Unix epoch)
							 | 
						|
										return parquet.Leaf(parquet.Int64Type), nil
							 | 
						|
									case schema_pb.ScalarType_DATE:
							 | 
						|
										// Stored as INT32 (days since Unix epoch)
							 | 
						|
										return parquet.Leaf(parquet.Int32Type), nil
							 | 
						|
									case schema_pb.ScalarType_DECIMAL:
							 | 
						|
										// Use maximum precision/scale to accommodate any decimal value
							 | 
						|
										// Per Parquet spec: precision ≤9→INT32, ≤18→INT64, >18→FixedLenByteArray
							 | 
						|
										// Using precision=38 (max for most systems), scale=18 for flexibility
							 | 
						|
										// Individual values can have smaller precision/scale, but schema supports maximum
							 | 
						|
										return parquet.Decimal(18, 38, parquet.FixedLenByteArrayType(16)), nil
							 | 
						|
									case schema_pb.ScalarType_TIME:
							 | 
						|
										// Stored as INT64 (microseconds since midnight)
							 | 
						|
										return parquet.Leaf(parquet.Int64Type), nil
							 | 
						|
									default:
							 | 
						|
										return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
							 | 
						|
									recordNode := parquet.Group{}
							 | 
						|
									for _, field := range recordType.Fields {
							 | 
						|
										parquetFieldType, err := toParquetFieldTypeWithRequirement(field.Type, field.IsRequired)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										recordNode[field.Name] = parquetFieldType
							 | 
						|
									}
							 | 
						|
									return recordNode, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// toParquetFieldTypeWithRequirement creates parquet field type respecting required/optional constraints
							 | 
						|
								func toParquetFieldTypeWithRequirement(fieldType *schema_pb.Type, isRequired bool) (dataType parquet.Node, err error) {
							 | 
						|
									switch fieldType.Kind.(type) {
							 | 
						|
									case *schema_pb.Type_ScalarType:
							 | 
						|
										dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										if isRequired {
							 | 
						|
											// Required fields are NOT wrapped in Optional
							 | 
						|
											return dataType, nil
							 | 
						|
										} else {
							 | 
						|
											// Optional fields are wrapped in Optional
							 | 
						|
											return parquet.Optional(dataType), nil
							 | 
						|
										}
							 | 
						|
									case *schema_pb.Type_RecordType:
							 | 
						|
										dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										if isRequired {
							 | 
						|
											return dataType, nil
							 | 
						|
										} else {
							 | 
						|
											return parquet.Optional(dataType), nil
							 | 
						|
										}
							 | 
						|
									case *schema_pb.Type_ListType:
							 | 
						|
										dataType, err = toParquetFieldTypeList(fieldType.GetListType())
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										// Lists are typically optional by nature
							 | 
						|
										return dataType, nil
							 | 
						|
									default:
							 | 
						|
										return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
							 | 
						|
									}
							 | 
						|
								}
							 |