You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.6 KiB

9 months ago
9 months ago
9 months ago
9 months ago
9 months ago
9 months ago
9 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) {
  8. switch fieldType.Kind.(type) {
  9. case *schema_pb.Type_ScalarType:
  10. endIndex = columnIndex+1
  11. var parquetValue parquet.Value
  12. parquetValue, err = toParquetValue(fieldValue)
  13. if err != nil {
  14. return
  15. }
  16. rowBuilder.Add(columnIndex, parquetValue)
  17. // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
  18. case *schema_pb.Type_ListType:
  19. rowBuilder.Next(columnIndex)
  20. // fmt.Printf("rowBuilder.Next %d\n", columnIndex)
  21. elementType := fieldType.GetListType().ElementType
  22. for _, value := range fieldValue.GetListValue().Values {
  23. if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil {
  24. return
  25. }
  26. }
  27. }
  28. return
  29. }
  30. func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
  31. visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) {
  32. return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index)
  33. }
  34. fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
  35. fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
  36. return visitValue(fieldType, fieldValue, visitor)
  37. }
  38. // typeValueVisitor is a function that is called for each value in a schema_pb.Value
  39. // Find the column index.
  40. // intended to be used in RowBuilder.Add(columnIndex, value)
  41. type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error)
  42. func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
  43. _, err = doVisitValue(fieldType, fieldValue, 0, visitor)
  44. return
  45. }
  46. // endIndex is exclusive
  47. // same logic as RowBuilder.configure in row_builder.go
  48. func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
  49. switch fieldType.Kind.(type) {
  50. case *schema_pb.Type_ScalarType:
  51. return visitor(fieldType, fieldValue, columnIndex)
  52. case *schema_pb.Type_ListType:
  53. return visitor(fieldType, fieldValue, columnIndex)
  54. case *schema_pb.Type_RecordType:
  55. for _, field := range fieldType.GetRecordType().Fields {
  56. fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
  57. if !found {
  58. // TODO check this if no such field found
  59. return columnIndex, nil
  60. }
  61. endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor)
  62. if err != nil {
  63. return
  64. }
  65. columnIndex = endIndex
  66. }
  67. return
  68. }
  69. return
  70. }
  71. func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
  72. switch value.Kind.(type) {
  73. case *schema_pb.Value_BoolValue:
  74. return parquet.BooleanValue(value.GetBoolValue()), nil
  75. case *schema_pb.Value_Int32Value:
  76. return parquet.Int32Value(value.GetInt32Value()), nil
  77. case *schema_pb.Value_Int64Value:
  78. return parquet.Int64Value(value.GetInt64Value()), nil
  79. case *schema_pb.Value_FloatValue:
  80. return parquet.FloatValue(value.GetFloatValue()), nil
  81. case *schema_pb.Value_DoubleValue:
  82. return parquet.DoubleValue(value.GetDoubleValue()), nil
  83. case *schema_pb.Value_BytesValue:
  84. return parquet.ByteArrayValue(value.GetBytesValue()), nil
  85. case *schema_pb.Value_StringValue:
  86. return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
  87. default:
  88. return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
  89. }
  90. }