You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

128 lines
3.5 KiB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. "github.com/parquet-go/parquet-go"
  5. "github.com/parquet-go/parquet-go/compress/zstd"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. "io"
  8. "os"
  9. "testing"
  10. )
  11. func TestWriteParquet(t *testing.T) {
  12. // create a schema_pb.RecordType
  13. recordType := NewRecordTypeBuilder().
  14. AddLongField("ID").
  15. AddLongField("CreatedAt").
  16. AddRecordField("Person", NewRecordTypeBuilder().
  17. AddStringField("zName").
  18. AddListField("emails", TypeString)).
  19. AddStringField("Company").Build()
  20. fmt.Printf("RecordType: %v\n", recordType)
  21. // create a parquet schema
  22. parquetSchema, err := ToParquetSchema("example", recordType)
  23. if err != nil {
  24. t.Fatalf("ToParquetSchema failed: %v", err)
  25. }
  26. fmt.Printf("ParquetSchema: %v\n", parquetSchema)
  27. fmt.Printf("Go Type: %+v\n", parquetSchema.GoType())
  28. filename := "example.parquet"
  29. count := 3
  30. testWritingParquetFile(t, count, filename, parquetSchema, recordType)
  31. total := testReadingParquetFile(t, filename, parquetSchema, recordType)
  32. if total != count {
  33. t.Fatalf("total != 128*1024: %v", total)
  34. }
  35. if err = os.Remove(filename); err != nil {
  36. t.Fatalf("os.Remove failed: %v", err)
  37. }
  38. }
  39. func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
  40. // create a parquet file
  41. file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
  42. if err != nil {
  43. t.Fatalf("os.Open failed: %v", err)
  44. }
  45. defer file.Close()
  46. writer := parquet.NewWriter(file, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.SpeedDefault}))
  47. rowBuilder := parquet.NewRowBuilder(parquetSchema)
  48. for i := 0; i < count; i++ {
  49. rowBuilder.Reset()
  50. // generate random data
  51. recordValue := NewRecordValueBuilder().
  52. AddLongValue("ID", 1+int64(i)).
  53. AddLongValue("CreatedAt", 2+2*int64(i)).
  54. AddRecordValue("Person", NewRecordValueBuilder().
  55. AddStringValue("zName", fmt.Sprintf("john_%d", i)).
  56. AddStringListValue("emails",
  57. fmt.Sprintf("john_%d@a.com", i),
  58. fmt.Sprintf("john_%d@b.com", i),
  59. fmt.Sprintf("john_%d@c.com", i),
  60. fmt.Sprintf("john_%d@d.com", i),
  61. fmt.Sprintf("john_%d@e.com", i))).
  62. AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
  63. AddRecordValue(rowBuilder, recordType, recordValue)
  64. // fmt.Printf("RecordValue: %v\n", recordValue)
  65. row := rowBuilder.Row()
  66. // fmt.Printf("Row: %+v\n", row)
  67. if err != nil {
  68. t.Fatalf("rowBuilder.Build failed: %v", err)
  69. }
  70. if _, err = writer.WriteRows([]parquet.Row{row}); err != nil {
  71. t.Fatalf("writer.Write failed: %v", err)
  72. }
  73. }
  74. if err = writer.Close(); err != nil {
  75. t.Fatalf("writer.WriteStop failed: %v", err)
  76. }
  77. }
  78. func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
  79. // read the parquet file
  80. file, err := os.Open(filename)
  81. if err != nil {
  82. t.Fatalf("os.Open failed: %v", err)
  83. }
  84. defer file.Close()
  85. reader := parquet.NewReader(file, parquetSchema)
  86. rows := make([]parquet.Row, 128)
  87. for {
  88. rowCount, err := reader.ReadRows(rows)
  89. if err != nil {
  90. if err == io.EOF {
  91. break
  92. }
  93. t.Fatalf("reader.Read failed: %v", err)
  94. }
  95. for i := 0; i < rowCount; i++ {
  96. row := rows[i]
  97. // convert parquet row to schema_pb.RecordValue
  98. recordValue, err := ToRecordValue(recordType, row)
  99. if err != nil {
  100. t.Fatalf("ToRecordValue failed: %v", err)
  101. }
  102. if rowCount < 10 {
  103. fmt.Printf("RecordValue: %v\n", recordValue)
  104. }
  105. }
  106. total += rowCount
  107. }
  108. fmt.Printf("total: %v\n", total)
  109. return
  110. }