Browse Source

record type begin and end

mq-subscribe
chrislu 8 months ago
parent
commit
189a7fc90e
  1. 8
      weed/mq/schema/schema_builder.go
  2. 36
      weed/mq/schema/struct_to_schema_test.go
  3. 15
      weed/mq/schema/to_parquet_levels_test.go
  4. 15
      weed/mq/schema/write_parquet_test.go

8
weed/mq/schema/schema_builder.go

@ -19,11 +19,11 @@ type RecordTypeBuilder struct {
recordType *schema_pb.RecordType recordType *schema_pb.RecordType
} }
func NewRecordTypeBuilder() *RecordTypeBuilder {
func RecordTypeBegin() *RecordTypeBuilder {
return &RecordTypeBuilder{recordType: &schema_pb.RecordType{}} return &RecordTypeBuilder{recordType: &schema_pb.RecordType{}}
} }
func (rtb *RecordTypeBuilder) Build() *schema_pb.RecordType {
func (rtb *RecordTypeBuilder) RecordTypeEnd() *schema_pb.RecordType {
// be consistent with parquet.node.go `func (g Group) Fields() []Field` // be consistent with parquet.node.go `func (g Group) Fields() []Field`
sort.Slice(rtb.recordType.Fields, func(i, j int) bool { sort.Slice(rtb.recordType.Fields, func(i, j int) bool {
return rtb.recordType.Fields[i].Name < rtb.recordType.Fields[j].Name return rtb.recordType.Fields[i].Name < rtb.recordType.Fields[j].Name
@ -39,10 +39,10 @@ func (rtb *RecordTypeBuilder) SetField(name string, scalarType *schema_pb.Type)
return rtb return rtb
} }
func (rtb *RecordTypeBuilder) SetRecordField(name string, recordTypeBuilder *RecordTypeBuilder) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetRecordField(name string, recordType *schema_pb.RecordType) *RecordTypeBuilder {
rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{ rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{
Name: name, Name: name,
Type: &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordTypeBuilder.Build()}},
Type: &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}},
}) })
return rtb return rtb
} }

36
weed/mq/schema/struct_to_schema_test.go

@ -30,10 +30,10 @@ func TestStructToSchema(t *testing.T) {
Field2 string Field2 string
}{}, }{},
}, },
want: NewRecordTypeBuilder().
want: RecordTypeBegin().
SetField("Field1", TypeInteger). SetField("Field1", TypeInteger).
SetField("Field2", TypeString). SetField("Field2", TypeString).
Build(),
RecordTypeEnd(),
}, },
{ {
name: "simple list", name: "simple list",
@ -43,10 +43,10 @@ func TestStructToSchema(t *testing.T) {
Field2 string Field2 string
}{}, }{},
}, },
want: NewRecordTypeBuilder().
want: RecordTypeBegin().
SetField("Field1", ListOf(TypeInteger)). SetField("Field1", ListOf(TypeInteger)).
SetField("Field2", TypeString). SetField("Field2", TypeString).
Build(),
RecordTypeEnd(),
}, },
{ {
name: "simple []byte", name: "simple []byte",
@ -55,9 +55,9 @@ func TestStructToSchema(t *testing.T) {
Field2 []byte Field2 []byte
}{}, }{},
}, },
want: NewRecordTypeBuilder().
want: RecordTypeBegin().
SetField("Field2", TypeBytes). SetField("Field2", TypeBytes).
Build(),
RecordTypeEnd(),
}, },
{ {
name: "nested simpe structs", name: "nested simpe structs",
@ -70,13 +70,15 @@ func TestStructToSchema(t *testing.T) {
} }
}{}, }{},
}, },
want: NewRecordTypeBuilder().
want: RecordTypeBegin().
SetField("Field1", TypeInteger). SetField("Field1", TypeInteger).
SetRecordField("Field2", NewRecordTypeBuilder().
SetRecordField("Field2",
RecordTypeBegin().
SetField("Field3", TypeString). SetField("Field3", TypeString).
SetField("Field4", TypeInteger),
SetField("Field4", TypeInteger).
RecordTypeEnd(),
). ).
Build(),
RecordTypeEnd(),
}, },
{ {
name: "nested struct type", name: "nested struct type",
@ -93,17 +95,19 @@ func TestStructToSchema(t *testing.T) {
} }
}{}, }{},
}, },
want: NewRecordTypeBuilder().
want: RecordTypeBegin().
SetField("Field1", TypeInteger). SetField("Field1", TypeInteger).
SetRecordField("Field2", NewRecordTypeBuilder().
SetRecordField("Field2", RecordTypeBegin().
SetField("Field3", TypeString). SetField("Field3", TypeString).
SetField("Field4", ListOf(TypeInteger)). SetField("Field4", ListOf(TypeInteger)).
SetRecordField("Field5", NewRecordTypeBuilder().
SetRecordField("Field5",
RecordTypeBegin().
SetField("Field6", TypeString). SetField("Field6", TypeString).
SetField("Field7", TypeBytes),
),
SetField("Field7", TypeBytes).
RecordTypeEnd(),
).RecordTypeEnd(),
). ).
Build(),
RecordTypeEnd(),
}, },
} }

15
weed/mq/schema/to_parquet_levels_test.go

@ -18,16 +18,21 @@ func TestToParquetLevels(t *testing.T) {
{ {
name: "nested type", name: "nested type",
args: args{ args: args{
NewRecordTypeBuilder().
RecordTypeBegin().
SetField("ID", TypeLong). SetField("ID", TypeLong).
SetField("CreatedAt", TypeLong). SetField("CreatedAt", TypeLong).
SetRecordField("Person", NewRecordTypeBuilder().
SetRecordField("Person",
RecordTypeBegin().
SetField("zName", TypeString). SetField("zName", TypeString).
SetField("emails", ListOf(TypeString))).
SetField("emails", ListOf(TypeString)).
RecordTypeEnd()).
SetField("Company", TypeString). SetField("Company", TypeString).
SetRecordField("Address", NewRecordTypeBuilder().
SetRecordField("Address",
RecordTypeBegin().
SetField("Street", TypeString). SetField("Street", TypeString).
SetField("City", TypeString)).Build(),
SetField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd(),
}, },
want: &ParquetLevels{ want: &ParquetLevels{
startColumnIndex: 0, startColumnIndex: 0,

15
weed/mq/schema/write_parquet_test.go

@ -12,16 +12,21 @@ import (
func TestWriteReadParquet(t *testing.T) { func TestWriteReadParquet(t *testing.T) {
// create a schema_pb.RecordType // create a schema_pb.RecordType
recordType := NewRecordTypeBuilder().
recordType := RecordTypeBegin().
SetField("ID", TypeLong). SetField("ID", TypeLong).
SetField("CreatedAt", TypeLong). SetField("CreatedAt", TypeLong).
SetRecordField("Person", NewRecordTypeBuilder().
SetRecordField("Person",
RecordTypeBegin().
SetField("zName", TypeString). SetField("zName", TypeString).
SetField("emails", ListOf(TypeString))).
SetField("emails", ListOf(TypeString)).
RecordTypeEnd()).
SetField("Company", TypeString). SetField("Company", TypeString).
SetRecordField("Address", NewRecordTypeBuilder().
SetRecordField("Address",
RecordTypeBegin().
SetField("Street", TypeString). SetField("Street", TypeString).
SetField("City", TypeString)).Build()
SetField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd()
fmt.Printf("RecordType: %v\n", recordType) fmt.Printf("RecordType: %v\n", recordType)
// create a parquet schema // create a parquet schema

Loading…
Cancel
Save