You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
3.6 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. "github.com/parquet-go/parquet-go"
  5. "github.com/parquet-go/parquet-go/compress/zstd"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. "io"
  8. "os"
  9. "testing"
  10. )
  11. func TestWriteReadParquet(t *testing.T) {
  12. // create a schema_pb.RecordType
  13. recordType := NewRecordTypeBuilder().
  14. AddLongField("ID").
  15. AddLongField("CreatedAt").
  16. AddRecordField("Person", NewRecordTypeBuilder().
  17. AddStringField("zName").
  18. AddListField("emails", TypeString)).
  19. AddStringField("Company").
  20. AddRecordField("Address", NewRecordTypeBuilder().
  21. AddStringField("Street").
  22. AddStringField("City")).Build()
  23. fmt.Printf("RecordType: %v\n", recordType)
  24. // create a parquet schema
  25. parquetSchema, err := ToParquetSchema("example", recordType)
  26. if err != nil {
  27. t.Fatalf("ToParquetSchema failed: %v", err)
  28. }
  29. fmt.Printf("ParquetSchema: %v\n", parquetSchema)
  30. fmt.Printf("Go Type: %+v\n", parquetSchema.GoType())
  31. filename := "example.parquet"
  32. count := 3
  33. testWritingParquetFile(t, count, filename, parquetSchema, recordType)
  34. total := testReadingParquetFile(t, filename, parquetSchema, recordType)
  35. if total != count {
  36. t.Fatalf("total != 128*1024: %v", total)
  37. }
  38. if err = os.Remove(filename); err != nil {
  39. t.Fatalf("os.Remove failed: %v", err)
  40. }
  41. }
  42. func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
  43. // create a parquet file
  44. file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
  45. if err != nil {
  46. t.Fatalf("os.Open failed: %v", err)
  47. }
  48. defer file.Close()
  49. writer := parquet.NewWriter(file, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.SpeedDefault}))
  50. rowBuilder := parquet.NewRowBuilder(parquetSchema)
  51. for i := 0; i < count; i++ {
  52. rowBuilder.Reset()
  53. // generate random data
  54. recordValue := NewRecordValueBuilder().
  55. AddLongValue("ID", 1+int64(i)).
  56. AddLongValue("CreatedAt", 2+2*int64(i)).
  57. AddRecordValue("Person", NewRecordValueBuilder().
  58. AddStringValue("zName", fmt.Sprintf("john_%d", i)).
  59. AddStringListValue("emails",
  60. fmt.Sprintf("john_%d@a.com", i),
  61. fmt.Sprintf("john_%d@b.com", i),
  62. fmt.Sprintf("john_%d@c.com", i),
  63. fmt.Sprintf("john_%d@d.com", i),
  64. fmt.Sprintf("john_%d@e.com", i))).
  65. AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
  66. AddRecordValue(rowBuilder, recordType, recordValue)
  67. if count < 10 {
  68. fmt.Printf("RecordValue: %v\n", recordValue)
  69. }
  70. row := rowBuilder.Row()
  71. if count < 10 {
  72. fmt.Printf("Row: %+v\n", row)
  73. }
  74. if err != nil {
  75. t.Fatalf("rowBuilder.Build failed: %v", err)
  76. }
  77. if _, err = writer.WriteRows([]parquet.Row{row}); err != nil {
  78. t.Fatalf("writer.Write failed: %v", err)
  79. }
  80. }
  81. if err = writer.Close(); err != nil {
  82. t.Fatalf("writer.WriteStop failed: %v", err)
  83. }
  84. }
  85. func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
  86. // read the parquet file
  87. file, err := os.Open(filename)
  88. if err != nil {
  89. t.Fatalf("os.Open failed: %v", err)
  90. }
  91. defer file.Close()
  92. reader := parquet.NewReader(file, parquetSchema)
  93. rows := make([]parquet.Row, 128)
  94. for {
  95. rowCount, err := reader.ReadRows(rows)
  96. if err != nil {
  97. if err == io.EOF {
  98. break
  99. }
  100. t.Fatalf("reader.Read failed: %v", err)
  101. }
  102. for i := 0; i < rowCount; i++ {
  103. row := rows[i]
  104. // convert parquet row to schema_pb.RecordValue
  105. recordValue, err := ToRecordValue(recordType, row)
  106. if err != nil {
  107. t.Fatalf("ToRecordValue failed: %v", err)
  108. }
  109. if rowCount < 10 {
  110. fmt.Printf("RecordValue: %v\n", recordValue)
  111. }
  112. }
  113. total += rowCount
  114. }
  115. fmt.Printf("total: %v\n", total)
  116. return
  117. }