diff --git a/weed/mq/schema/to_parquet_value.go b/weed/mq/schema/to_parquet_value.go index e80b96396..22a93de67 100644 --- a/weed/mq/schema/to_parquet_value.go +++ b/weed/mq/schema/to_parquet_value.go @@ -30,11 +30,7 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, return } -func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error { - parquetLevels, err := ToParquetLevels(recordType) - if err != nil { - return err - } +func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error { visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) { return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue) } diff --git a/weed/mq/schema/to_schema_value.go b/weed/mq/schema/to_schema_value.go index ac57ca430..18c1c3b5c 100644 --- a/weed/mq/schema/to_schema_value.go +++ b/weed/mq/schema/to_schema_value.go @@ -9,11 +9,7 @@ import ( // ToRecordValue converts a parquet.Row to a schema_pb.RecordValue // This does not work or did not test with nested structures. // Using this may fail to convert the parquet.Row to schema_pb.RecordValue -func ToRecordValue(recordType *schema_pb.RecordType, row parquet.Row) (*schema_pb.RecordValue, error) { - parquetLevels, err := ToParquetLevels(recordType) - if err != nil { - return nil, err - } +func ToRecordValue(recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, row parquet.Row) (*schema_pb.RecordValue, error) { values := []parquet.Value(row) recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0) if err != nil { diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go index 928dab1d3..ac2d6b154 100644 --- a/weed/mq/schema/write_parquet_test.go +++ b/weed/mq/schema/write_parquet_test.go @@ -52,6 +52,11 @@ func TestWriteReadParquet(t *testing.T) { } func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) { + parquetLevels, err := ToParquetLevels(recordType) + if err != nil { + t.Fatalf("ToParquetLevels failed: %v", err) + } + // create a parquet file file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664) if err != nil { @@ -75,7 +80,7 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch fmt.Sprintf("john_%d@d.com", i), fmt.Sprintf("john_%d@e.com", i))). AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build() - AddRecordValue(rowBuilder, recordType, recordValue) + AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue) if count < 10 { fmt.Printf("RecordValue: %v\n", recordValue) @@ -101,6 +106,11 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch } func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) { + parquetLevels, err := ToParquetLevels(recordType) + if err != nil { + t.Fatalf("ToParquetLevels failed: %v", err) + } + // read the parquet file file, err := os.Open(filename) if err != nil { @@ -120,7 +130,7 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue - recordValue, err := ToRecordValue(recordType, row) + recordValue, err := ToRecordValue(recordType, parquetLevels, row) if err != nil { t.Fatalf("ToRecordValue failed: %v", err) }