Browse Source

rename

mq-subscribe
chrislu 9 months ago
parent
commit
a2a872ca03
  1. 18
      weed/mq/schema/schema_builder.go
  2. 8
      weed/mq/schema/schema_builder_test.go
  3. 18
      weed/mq/schema/to_parquet_levels_test.go
  4. 18
      weed/mq/schema/write_parquet_test.go

18
weed/mq/schema/schema_builder.go

@ -39,29 +39,29 @@ func (rtb *RecordTypeBuilder) addField(name string, scalarType *schema_pb.Type)
return rtb return rtb
} }
func (rtb *RecordTypeBuilder) AddBoolField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetBoolField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeBoolean) return rtb.addField(name, TypeBoolean)
} }
func (rtb *RecordTypeBuilder) AddIntegerField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetIntegerField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeInteger) return rtb.addField(name, TypeInteger)
} }
func (rtb *RecordTypeBuilder) AddLongField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetLongField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeLong) return rtb.addField(name, TypeLong)
} }
func (rtb *RecordTypeBuilder) AddFloatField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetFloatField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeFloat) return rtb.addField(name, TypeFloat)
} }
func (rtb *RecordTypeBuilder) AddDoubleField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetDoubleField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeDouble) return rtb.addField(name, TypeDouble)
} }
func (rtb *RecordTypeBuilder) AddBytesField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetBytesField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeBytes) return rtb.addField(name, TypeBytes)
} }
func (rtb *RecordTypeBuilder) AddStringField(name string) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetStringField(name string) *RecordTypeBuilder {
return rtb.addField(name, TypeString) return rtb.addField(name, TypeString)
} }
func (rtb *RecordTypeBuilder) AddRecordField(name string, recordTypeBuilder *RecordTypeBuilder) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetRecordField(name string, recordTypeBuilder *RecordTypeBuilder) *RecordTypeBuilder {
rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{ rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{
Name: name, Name: name,
Type: &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordTypeBuilder.Build()}}, Type: &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordTypeBuilder.Build()}},
@ -69,7 +69,7 @@ func (rtb *RecordTypeBuilder) AddRecordField(name string, recordTypeBuilder *Rec
return rtb return rtb
} }
func (rtb *RecordTypeBuilder) AddListField(name string, elementType *schema_pb.Type) *RecordTypeBuilder {
func (rtb *RecordTypeBuilder) SetListField(name string, elementType *schema_pb.Type) *RecordTypeBuilder {
rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{ rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{
Name: name, Name: name,
Type: &schema_pb.Type{Kind: &schema_pb.Type_ListType{ListType: &schema_pb.ListType{ElementType: elementType}}}, Type: &schema_pb.Type{Kind: &schema_pb.Type_ListType{ListType: &schema_pb.ListType{ElementType: elementType}}},

8
weed/mq/schema/schema_builder_test.go

@ -7,10 +7,10 @@ import (
func TestSchemaBuilder(t *testing.T) { func TestSchemaBuilder(t *testing.T) {
rtb := NewRecordTypeBuilder() rtb := NewRecordTypeBuilder()
rtb.AddStringField("ID").
AddLongField("CreatedAt").
AddLongField("ModifiedAt").
AddStringField("User")
rtb.SetStringField("ID").
SetLongField("CreatedAt").
SetLongField("ModifiedAt").
SetStringField("User")
recordType := rtb.Build() recordType := rtb.Build()
fmt.Printf("RecordType: %v\n", recordType) fmt.Printf("RecordType: %v\n", recordType)

18
weed/mq/schema/to_parquet_levels_test.go

@ -19,15 +19,15 @@ func TestToParquetLevels(t *testing.T) {
name: "nested type", name: "nested type",
args: args{ args: args{
NewRecordTypeBuilder(). NewRecordTypeBuilder().
AddLongField("ID").
AddLongField("CreatedAt").
AddRecordField("Person", NewRecordTypeBuilder().
AddStringField("zName").
AddListField("emails", TypeString)).
AddStringField("Company").
AddRecordField("Address", NewRecordTypeBuilder().
AddStringField("Street").
AddStringField("City")).Build(),
SetLongField("ID").
SetLongField("CreatedAt").
SetRecordField("Person", NewRecordTypeBuilder().
SetStringField("zName").
SetListField("emails", TypeString)).
SetStringField("Company").
SetRecordField("Address", NewRecordTypeBuilder().
SetStringField("Street").
SetStringField("City")).Build(),
}, },
want: &ParquetLevels{ want: &ParquetLevels{
startColumnIndex: 0, startColumnIndex: 0,

18
weed/mq/schema/write_parquet_test.go

@ -13,15 +13,15 @@ import (
func TestWriteReadParquet(t *testing.T) { func TestWriteReadParquet(t *testing.T) {
// create a schema_pb.RecordType // create a schema_pb.RecordType
recordType := NewRecordTypeBuilder(). recordType := NewRecordTypeBuilder().
AddLongField("ID").
AddLongField("CreatedAt").
AddRecordField("Person", NewRecordTypeBuilder().
AddStringField("zName").
AddListField("emails", TypeString)).
AddStringField("Company").
AddRecordField("Address", NewRecordTypeBuilder().
AddStringField("Street").
AddStringField("City")).Build()
SetLongField("ID").
SetLongField("CreatedAt").
SetRecordField("Person", NewRecordTypeBuilder().
SetStringField("zName").
SetListField("emails", TypeString)).
SetStringField("Company").
SetRecordField("Address", NewRecordTypeBuilder().
SetStringField("Street").
SetStringField("City")).Build()
fmt.Printf("RecordType: %v\n", recordType) fmt.Printf("RecordType: %v\n", recordType)
// create a parquet schema // create a parquet schema

Loading…
Cancel
Save