You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
2.4 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
  8. rootNode, err := toParquetFieldTypeRecord(recordType)
  9. if err != nil {
  10. return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
  11. }
  12. // Fields are sorted by name, so the value should be sorted also
  13. // the sorting is inside parquet.`func (g Group) Fields() []Field`
  14. return parquet.NewSchema(topicName, rootNode), nil
  15. }
  16. func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
  17. switch fieldType.Kind.(type) {
  18. case *schema_pb.Type_ScalarType:
  19. dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
  20. dataType = parquet.Optional(dataType)
  21. case *schema_pb.Type_RecordType:
  22. dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
  23. case *schema_pb.Type_ListType:
  24. dataType, err = toParquetFieldTypeList(fieldType.GetListType())
  25. default:
  26. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  27. }
  28. return dataType, err
  29. }
  30. func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
  31. elementType, err := toParquetFieldType(listType.ElementType)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return parquet.Repeated(elementType), nil
  36. }
  37. func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
  38. switch scalarType {
  39. case schema_pb.ScalarType_BOOLEAN:
  40. return parquet.Leaf(parquet.BooleanType), nil
  41. case schema_pb.ScalarType_INTEGER:
  42. return parquet.Leaf(parquet.Int32Type), nil
  43. case schema_pb.ScalarType_LONG:
  44. return parquet.Leaf(parquet.Int64Type), nil
  45. case schema_pb.ScalarType_FLOAT:
  46. return parquet.Leaf(parquet.FloatType), nil
  47. case schema_pb.ScalarType_DOUBLE:
  48. return parquet.Leaf(parquet.DoubleType), nil
  49. case schema_pb.ScalarType_BYTES:
  50. return parquet.Leaf(parquet.ByteArrayType), nil
  51. case schema_pb.ScalarType_STRING:
  52. return parquet.Leaf(parquet.ByteArrayType), nil
  53. default:
  54. return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
  55. }
  56. }
  57. func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
  58. recordNode := parquet.Group{}
  59. for _, field := range recordType.Fields {
  60. parquetFieldType, err := toParquetFieldType(field.Type)
  61. if err != nil {
  62. return nil, err
  63. }
  64. recordNode[field.Name] = parquetFieldType
  65. }
  66. return recordNode, nil
  67. }