You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

145 lines
3.9 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package schema
  2. import (
  3. "fmt"
  4. "github.com/parquet-go/parquet-go"
  5. "github.com/parquet-go/parquet-go/compress/zstd"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. "io"
  8. "os"
  9. "testing"
  10. )
  11. func TestWriteReadParquet(t *testing.T) {
  12. // create a schema_pb.RecordType
  13. recordType := NewRecordTypeBuilder().
  14. SetLongField("ID").
  15. SetLongField("CreatedAt").
  16. SetRecordField("Person", NewRecordTypeBuilder().
  17. SetStringField("zName").
  18. SetListField("emails", TypeString)).
  19. SetStringField("Company").
  20. SetRecordField("Address", NewRecordTypeBuilder().
  21. SetStringField("Street").
  22. SetStringField("City")).Build()
  23. fmt.Printf("RecordType: %v\n", recordType)
  24. // create a parquet schema
  25. parquetSchema, err := ToParquetSchema("example", recordType)
  26. if err != nil {
  27. t.Fatalf("ToParquetSchema failed: %v", err)
  28. }
  29. fmt.Printf("ParquetSchema: %v\n", parquetSchema)
  30. fmt.Printf("Go Type: %+v\n", parquetSchema.GoType())
  31. filename := "example.parquet"
  32. count := 3
  33. testWritingParquetFile(t, count, filename, parquetSchema, recordType)
  34. total := testReadingParquetFile(t, filename, parquetSchema, recordType)
  35. if total != count {
  36. t.Fatalf("total != 128*1024: %v", total)
  37. }
  38. if err = os.Remove(filename); err != nil {
  39. t.Fatalf("os.Remove failed: %v", err)
  40. }
  41. }
  42. func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
  43. parquetLevels, err := ToParquetLevels(recordType)
  44. if err != nil {
  45. t.Fatalf("ToParquetLevels failed: %v", err)
  46. }
  47. // create a parquet file
  48. file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
  49. if err != nil {
  50. t.Fatalf("os.Open failed: %v", err)
  51. }
  52. defer file.Close()
  53. writer := parquet.NewWriter(file, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
  54. rowBuilder := parquet.NewRowBuilder(parquetSchema)
  55. for i := 0; i < count; i++ {
  56. rowBuilder.Reset()
  57. // generate random data
  58. recordValue := NewRecordValueBuilder().
  59. SetLongValue("ID", 1+int64(i)).
  60. SetLongValue("CreatedAt", 2+2*int64(i)).
  61. SetRecordValue("Person", NewRecordValueBuilder().
  62. SetStringValue("zName", fmt.Sprintf("john_%d", i)).
  63. SetStringListValue("emails",
  64. fmt.Sprintf("john_%d@a.com", i),
  65. fmt.Sprintf("john_%d@b.com", i),
  66. fmt.Sprintf("john_%d@c.com", i),
  67. fmt.Sprintf("john_%d@d.com", i),
  68. fmt.Sprintf("john_%d@e.com", i))).
  69. SetStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
  70. AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
  71. if count < 10 {
  72. fmt.Printf("RecordValue: %v\n", recordValue)
  73. }
  74. row := rowBuilder.Row()
  75. if count < 10 {
  76. fmt.Printf("Row: %+v\n", row)
  77. }
  78. if err != nil {
  79. t.Fatalf("rowBuilder.Build failed: %v", err)
  80. }
  81. if _, err = writer.WriteRows([]parquet.Row{row}); err != nil {
  82. t.Fatalf("writer.Write failed: %v", err)
  83. }
  84. }
  85. if err = writer.Close(); err != nil {
  86. t.Fatalf("writer.WriteStop failed: %v", err)
  87. }
  88. }
  89. func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
  90. parquetLevels, err := ToParquetLevels(recordType)
  91. if err != nil {
  92. t.Fatalf("ToParquetLevels failed: %v", err)
  93. }
  94. // read the parquet file
  95. file, err := os.Open(filename)
  96. if err != nil {
  97. t.Fatalf("os.Open failed: %v", err)
  98. }
  99. defer file.Close()
  100. reader := parquet.NewReader(file, parquetSchema)
  101. rows := make([]parquet.Row, 128)
  102. for {
  103. rowCount, err := reader.ReadRows(rows)
  104. if err != nil {
  105. if err == io.EOF {
  106. break
  107. }
  108. t.Fatalf("reader.Read failed: %v", err)
  109. }
  110. for i := 0; i < rowCount; i++ {
  111. row := rows[i]
  112. // convert parquet row to schema_pb.RecordValue
  113. recordValue, err := ToRecordValue(recordType, parquetLevels, row)
  114. if err != nil {
  115. t.Fatalf("ToRecordValue failed: %v", err)
  116. }
  117. if rowCount < 10 {
  118. fmt.Printf("RecordValue: %v\n", recordValue)
  119. }
  120. }
  121. total += rowCount
  122. }
  123. fmt.Printf("total: %v\n", total)
  124. return
  125. }