You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
3.4 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
9 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
  8. switch fieldType.Kind.(type) {
  9. case *schema_pb.Type_ScalarType:
  10. var parquetValue parquet.Value
  11. parquetValue, err = toParquetValue(fieldValue)
  12. if err != nil {
  13. return
  14. }
  15. rowBuilder.Add(levels.startColumnIndex, parquetValue)
  16. // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
  17. case *schema_pb.Type_ListType:
  18. rowBuilder.Next(levels.startColumnIndex)
  19. // fmt.Printf("rowBuilder.Next %d\n", columnIndex)
  20. elementType := fieldType.GetListType().ElementType
  21. for _, value := range fieldValue.GetListValue().Values {
  22. if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
  23. return
  24. }
  25. }
  26. }
  27. return
  28. }
  29. func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
  30. visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
  31. return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
  32. }
  33. fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
  34. fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
  35. return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
  36. }
  37. // typeValueVisitor is a function that is called for each value in a schema_pb.Value
  38. // Find the column index.
  39. // intended to be used in RowBuilder.Add(columnIndex, value)
  40. type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
  41. // endIndex is exclusive
  42. // same logic as RowBuilder.configure in row_builder.go
  43. func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
  44. switch fieldType.Kind.(type) {
  45. case *schema_pb.Type_ScalarType:
  46. return visitor(fieldType, levels, fieldValue)
  47. case *schema_pb.Type_ListType:
  48. return visitor(fieldType, levels, fieldValue)
  49. case *schema_pb.Type_RecordType:
  50. for _, field := range fieldType.GetRecordType().Fields {
  51. fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
  52. if !found {
  53. // TODO check this if no such field found
  54. continue
  55. }
  56. fieldLevels := levels.levels[field.Name]
  57. err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor)
  58. if err != nil {
  59. return
  60. }
  61. }
  62. return
  63. }
  64. return
  65. }
  66. func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
  67. switch value.Kind.(type) {
  68. case *schema_pb.Value_BoolValue:
  69. return parquet.BooleanValue(value.GetBoolValue()), nil
  70. case *schema_pb.Value_Int32Value:
  71. return parquet.Int32Value(value.GetInt32Value()), nil
  72. case *schema_pb.Value_Int64Value:
  73. return parquet.Int64Value(value.GetInt64Value()), nil
  74. case *schema_pb.Value_FloatValue:
  75. return parquet.FloatValue(value.GetFloatValue()), nil
  76. case *schema_pb.Value_DoubleValue:
  77. return parquet.DoubleValue(value.GetDoubleValue()), nil
  78. case *schema_pb.Value_BytesValue:
  79. return parquet.ByteArrayValue(value.GetBytesValue()), nil
  80. case *schema_pb.Value_StringValue:
  81. return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
  82. default:
  83. return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
  84. }
  85. }