You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
2.5 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
9 months ago
9 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
  8. rootNode, err := toParquetFieldTypeRecord(recordType)
  9. if err != nil {
  10. return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
  11. }
  12. // Fields are sorted by name, so the value should be sorted also
  13. // the sorting is inside parquet.`func (g Group) Fields() []Field`
  14. return parquet.NewSchema(topicName, rootNode), nil
  15. }
  16. func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
  17. switch fieldType.Kind.(type) {
  18. case *schema_pb.Type_ScalarType:
  19. dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
  20. dataType = parquet.Optional(dataType)
  21. case *schema_pb.Type_RecordType:
  22. dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
  23. dataType = parquet.Optional(dataType)
  24. case *schema_pb.Type_ListType:
  25. dataType, err = toParquetFieldTypeList(fieldType.GetListType())
  26. default:
  27. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  28. }
  29. return dataType, err
  30. }
  31. func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
  32. elementType, err := toParquetFieldType(listType.ElementType)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return parquet.Repeated(elementType), nil
  37. }
  38. func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
  39. switch scalarType {
  40. case schema_pb.ScalarType_BOOL:
  41. return parquet.Leaf(parquet.BooleanType), nil
  42. case schema_pb.ScalarType_INT32:
  43. return parquet.Leaf(parquet.Int32Type), nil
  44. case schema_pb.ScalarType_INT64:
  45. return parquet.Leaf(parquet.Int64Type), nil
  46. case schema_pb.ScalarType_FLOAT:
  47. return parquet.Leaf(parquet.FloatType), nil
  48. case schema_pb.ScalarType_DOUBLE:
  49. return parquet.Leaf(parquet.DoubleType), nil
  50. case schema_pb.ScalarType_BYTES:
  51. return parquet.Leaf(parquet.ByteArrayType), nil
  52. case schema_pb.ScalarType_STRING:
  53. return parquet.Leaf(parquet.ByteArrayType), nil
  54. default:
  55. return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
  56. }
  57. }
  58. func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
  59. recordNode := parquet.Group{}
  60. for _, field := range recordType.Fields {
  61. parquetFieldType, err := toParquetFieldType(field.Type)
  62. if err != nil {
  63. return nil, err
  64. }
  65. recordNode[field.Name] = parquetFieldType
  66. }
  67. return recordNode, nil
  68. }