You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.6 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) {
  8. switch fieldType.Kind.(type) {
  9. case *schema_pb.Type_ScalarType:
  10. endIndex = columnIndex+1
  11. var parquetValue parquet.Value
  12. parquetValue, err = toParquetValue(fieldValue)
  13. if err != nil {
  14. return
  15. }
  16. rowBuilder.Add(columnIndex, parquetValue)
  17. // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
  18. case *schema_pb.Type_ListType:
  19. rowBuilder.Next(columnIndex)
  20. // fmt.Printf("rowBuilder.Next %d\n", columnIndex)
  21. elementType := fieldType.GetListType().ElementType
  22. for _, value := range fieldValue.GetListValue().Values {
  23. if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil {
  24. return
  25. }
  26. }
  27. }
  28. return
  29. }
  30. func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
  31. visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) {
  32. return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index)
  33. }
  34. fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
  35. fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
  36. return visitValue(fieldType, fieldValue, visitor)
  37. }
  38. // typeValueVisitor is a function that is called for each value in a schema_pb.Value
  39. // Find the column index.
  40. // intended to be used in RowBuilder.Add(columnIndex, value)
  41. type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error)
  42. func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
  43. _, err = doVisitValue(fieldType, fieldValue, 0, visitor)
  44. return
  45. }
  46. // endIndex is exclusive
  47. // same logic as RowBuilder.configure in row_builder.go
  48. func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
  49. switch fieldType.Kind.(type) {
  50. case *schema_pb.Type_ScalarType:
  51. return visitor(fieldType, fieldValue, columnIndex)
  52. case *schema_pb.Type_ListType:
  53. return visitor(fieldType, fieldValue, columnIndex)
  54. case *schema_pb.Type_RecordType:
  55. for _, field := range fieldType.GetRecordType().Fields {
  56. fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
  57. if !found {
  58. // TODO check this if no such field found
  59. return columnIndex, nil
  60. }
  61. endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor)
  62. if err != nil {
  63. return
  64. }
  65. columnIndex = endIndex
  66. }
  67. return
  68. }
  69. return
  70. }
  71. func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
  72. switch value.Kind.(type) {
  73. case *schema_pb.Value_BoolValue:
  74. return parquet.BooleanValue(value.GetBoolValue()), nil
  75. case *schema_pb.Value_Int32Value:
  76. return parquet.Int32Value(value.GetInt32Value()), nil
  77. case *schema_pb.Value_Int64Value:
  78. return parquet.Int64Value(value.GetInt64Value()), nil
  79. case *schema_pb.Value_FloatValue:
  80. return parquet.FloatValue(value.GetFloatValue()), nil
  81. case *schema_pb.Value_DoubleValue:
  82. return parquet.DoubleValue(value.GetDoubleValue()), nil
  83. case *schema_pb.Value_BytesValue:
  84. return parquet.ByteArrayValue(value.GetBytesValue()), nil
  85. case *schema_pb.Value_StringValue:
  86. return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
  87. default:
  88. return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
  89. }
  90. }