You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

71 lines
2.2 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
  8. rootNode, err := toParquetFieldTypeRecord(recordType)
  9. if err != nil {
  10. return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
  11. }
  12. return parquet.NewSchema(topicName, rootNode), nil
  13. }
  14. func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
  15. switch fieldType.Kind.(type) {
  16. case *schema_pb.Type_ScalarType:
  17. dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
  18. case *schema_pb.Type_RecordType:
  19. dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
  20. case *schema_pb.Type_ListType:
  21. dataType, err = toParquetFieldTypeList(fieldType.GetListType())
  22. default:
  23. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  24. }
  25. return dataType, err
  26. }
  27. func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
  28. elementType, err := toParquetFieldType(listType.ElementType)
  29. if err != nil {
  30. return nil, err
  31. }
  32. return parquet.List(elementType), nil
  33. }
  34. func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
  35. switch scalarType {
  36. case schema_pb.ScalarType_BOOLEAN:
  37. return parquet.Leaf(parquet.BooleanType), nil
  38. case schema_pb.ScalarType_INTEGER:
  39. return parquet.Leaf(parquet.Int32Type), nil
  40. case schema_pb.ScalarType_LONG:
  41. return parquet.Leaf(parquet.Int64Type), nil
  42. case schema_pb.ScalarType_FLOAT:
  43. return parquet.Leaf(parquet.FloatType), nil
  44. case schema_pb.ScalarType_DOUBLE:
  45. return parquet.Leaf(parquet.DoubleType), nil
  46. case schema_pb.ScalarType_BYTES:
  47. return parquet.Leaf(parquet.ByteArrayType), nil
  48. case schema_pb.ScalarType_STRING:
  49. return parquet.String(), nil
  50. default:
  51. return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
  52. }
  53. }
  54. func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
  55. recordNode := parquet.Group{}
  56. for _, field := range recordType.Fields {
  57. parquetFieldType, err := toParquetFieldType(field.Type)
  58. if err != nil {
  59. return nil, err
  60. }
  61. recordNode[field.Name] = parquetFieldType
  62. }
  63. return recordNode, nil
  64. }