You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							75 lines
						
					
					
						
							2.5 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							75 lines
						
					
					
						
							2.5 KiB
						
					
					
				| package schema | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	parquet "github.com/parquet-go/parquet-go" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | |
| ) | |
| 
 | |
| func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) { | |
| 	rootNode, err := toParquetFieldTypeRecord(recordType) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err) | |
| 	} | |
| 
 | |
| 	// Fields are sorted by name, so the value should be sorted also | |
| 	// the sorting is inside parquet.`func (g Group) Fields() []Field` | |
| 	return parquet.NewSchema(topicName, rootNode), nil | |
| } | |
| 
 | |
| func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) { | |
| 	switch fieldType.Kind.(type) { | |
| 	case *schema_pb.Type_ScalarType: | |
| 		dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType()) | |
| 		dataType = parquet.Optional(dataType) | |
| 	case *schema_pb.Type_RecordType: | |
| 		dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType()) | |
| 		dataType = parquet.Optional(dataType) | |
| 	case *schema_pb.Type_ListType: | |
| 		dataType, err = toParquetFieldTypeList(fieldType.GetListType()) | |
| 	default: | |
| 		return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind) | |
| 	} | |
| 
 | |
| 	return dataType, err | |
| } | |
| 
 | |
| func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) { | |
| 	elementType, err := toParquetFieldType(listType.ElementType) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 	return parquet.Repeated(elementType), nil | |
| } | |
| 
 | |
| func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) { | |
| 	switch scalarType { | |
| 	case schema_pb.ScalarType_BOOL: | |
| 		return parquet.Leaf(parquet.BooleanType), nil | |
| 	case schema_pb.ScalarType_INT32: | |
| 		return parquet.Leaf(parquet.Int32Type), nil | |
| 	case schema_pb.ScalarType_INT64: | |
| 		return parquet.Leaf(parquet.Int64Type), nil | |
| 	case schema_pb.ScalarType_FLOAT: | |
| 		return parquet.Leaf(parquet.FloatType), nil | |
| 	case schema_pb.ScalarType_DOUBLE: | |
| 		return parquet.Leaf(parquet.DoubleType), nil | |
| 	case schema_pb.ScalarType_BYTES: | |
| 		return parquet.Leaf(parquet.ByteArrayType), nil | |
| 	case schema_pb.ScalarType_STRING: | |
| 		return parquet.Leaf(parquet.ByteArrayType), nil | |
| 	default: | |
| 		return nil, fmt.Errorf("unknown scalar type: %v", scalarType) | |
| 	} | |
| } | |
| func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) { | |
| 	recordNode := parquet.Group{} | |
| 	for _, field := range recordType.Fields { | |
| 		parquetFieldType, err := toParquetFieldType(field.Type) | |
| 		if err != nil { | |
| 			return nil, err | |
| 		} | |
| 		recordNode[field.Name] = parquetFieldType | |
| 	} | |
| 	return recordNode, nil | |
| }
 |