You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

86 lines
2.8 KiB

  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func AddRecordValue(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value) error {
  8. visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error {
  9. switch fieldType.Kind.(type) {
  10. case *schema_pb.Type_ScalarType:
  11. parquetValue, err := toParquetValue(fieldValue)
  12. if err != nil {
  13. return err
  14. }
  15. rowBuilder.Add(index, parquetValue)
  16. }
  17. return nil
  18. }
  19. return visitValue(fieldType, fieldValue, visitor)
  20. }
  21. // typeValueVisitor is a function that is called for each value in a schema_pb.Value
  22. // Find the column index.
  23. // intended to be used in RowBuilder.Add(columnIndex, value)
  24. type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error
  25. func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
  26. _, err = doVisitValue(fieldType, fieldValue, 0, visitor)
  27. return
  28. }
  29. // endIndex is exclusive
  30. // same logic as RowBuilder.configure in row_builder.go
  31. func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
  32. switch fieldType.Kind.(type) {
  33. case *schema_pb.Type_ScalarType:
  34. return columnIndex+1, visitor(fieldType, fieldValue, columnIndex)
  35. case *schema_pb.Type_ListType:
  36. for _, value := range fieldValue.GetListValue().Values {
  37. err = visitor(fieldType, value, columnIndex)
  38. if err != nil {
  39. return
  40. }
  41. }
  42. return columnIndex+1, nil
  43. case *schema_pb.Type_RecordType:
  44. for _, field := range fieldType.GetRecordType().Fields {
  45. fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
  46. if !found {
  47. // TODO check this if no such field found
  48. return columnIndex, nil
  49. }
  50. endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor)
  51. if err != nil {
  52. return
  53. }
  54. columnIndex = endIndex
  55. }
  56. return
  57. }
  58. return
  59. }
  60. func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
  61. switch value.Kind.(type) {
  62. case *schema_pb.Value_BoolValue:
  63. return parquet.BooleanValue(value.GetBoolValue()), nil
  64. case *schema_pb.Value_Int32Value:
  65. return parquet.Int32Value(value.GetInt32Value()), nil
  66. case *schema_pb.Value_Int64Value:
  67. return parquet.Int64Value(value.GetInt64Value()), nil
  68. case *schema_pb.Value_FloatValue:
  69. return parquet.FloatValue(value.GetFloatValue()), nil
  70. case *schema_pb.Value_DoubleValue:
  71. return parquet.DoubleValue(value.GetDoubleValue()), nil
  72. case *schema_pb.Value_BytesValue:
  73. return parquet.ByteArrayValue(value.GetBytesValue()), nil
  74. case *schema_pb.Value_StringValue:
  75. return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
  76. default:
  77. return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
  78. }
  79. }