Browse Source

consistent with protobuf data types

mq-subscribe
chrislu 8 months ago
parent
commit
b8af997e41
  1. 16
      weed/mq/client/cmd/weed_pub_record/publisher_record.go
  2. 8
      weed/mq/schema/schema_builder.go
  3. 8
      weed/mq/schema/struct_to_schema.go
  4. 12
      weed/mq/schema/struct_to_schema_test.go
  5. 4
      weed/mq/schema/to_parquet_levels_test.go
  6. 16
      weed/mq/schema/value_builder.go
  7. 8
      weed/mq/schema/write_parquet_test.go

16
weed/mq/client/cmd/weed_pub_record/publisher_record.go

@ -73,10 +73,10 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
SetBytes("key", r.Key).
SetBytes("field1", r.Field1).
SetString("field2", r.Field2).
SetInt("field3", int32(r.Field3)).
SetLong("field4", r.Field4).
SetFloat("field5", r.Field5).
SetDouble("field6", r.Field6).
SetInt32("field3", int32(r.Field3)).
SetInt64("field4", r.Field4).
SetFloat32("field5", r.Field5).
SetFloat64("field6", r.Field6).
SetBool("field7", r.Field7).
RecordEnd()
}
@ -88,10 +88,10 @@ func main() {
WithField("key", schema.TypeBytes).
WithField("field1", schema.TypeBytes).
WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInteger).
WithField("field4", schema.TypeLong).
WithField("field5", schema.TypeFloat).
WithField("field6", schema.TypeDouble).
WithField("field3", schema.TypeInt32).
WithField("field4", schema.Type64).
WithField("field5", schema.TypeFloat32).
WithField("field6", schema.TypeFloat64).
WithField("field7", schema.TypeBoolean).
RecordTypeEnd()

8
weed/mq/schema/schema_builder.go

@ -7,10 +7,10 @@ import (
var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOLEAN}}
TypeInteger = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INTEGER}}
TypeLong = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_LONG}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INTEGER}}
Type64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_LONG}}
TypeFloat32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeFloat64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
)

8
weed/mq/schema/struct_to_schema.go

@ -19,13 +19,13 @@ func reflectTypeToSchemaType(t reflect.Type) *schema_pb.Type {
case reflect.Bool:
return TypeBoolean
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32:
return TypeInteger
return TypeInt32
case reflect.Int64:
return TypeLong
return Type64
case reflect.Float32:
return TypeFloat
return TypeFloat32
case reflect.Float64:
return TypeDouble
return TypeFloat64
case reflect.String:
return TypeString
case reflect.Slice:

12
weed/mq/schema/struct_to_schema_test.go

@ -31,7 +31,7 @@ func TestStructToSchema(t *testing.T) {
}{},
},
want: RecordTypeBegin().
WithField("Field1", TypeInteger).
WithField("Field1", TypeInt32).
WithField("Field2", TypeString).
RecordTypeEnd(),
},
@ -44,7 +44,7 @@ func TestStructToSchema(t *testing.T) {
}{},
},
want: RecordTypeBegin().
WithField("Field1", ListOf(TypeInteger)).
WithField("Field1", ListOf(TypeInt32)).
WithField("Field2", TypeString).
RecordTypeEnd(),
},
@ -71,11 +71,11 @@ func TestStructToSchema(t *testing.T) {
}{},
},
want: RecordTypeBegin().
WithField("Field1", TypeInteger).
WithField("Field1", TypeInt32).
WithRecordField("Field2",
RecordTypeBegin().
WithField("Field3", TypeString).
WithField("Field4", TypeInteger).
WithField("Field4", TypeInt32).
RecordTypeEnd(),
).
RecordTypeEnd(),
@ -96,10 +96,10 @@ func TestStructToSchema(t *testing.T) {
}{},
},
want: RecordTypeBegin().
WithField("Field1", TypeInteger).
WithField("Field1", TypeInt32).
WithRecordField("Field2", RecordTypeBegin().
WithField("Field3", TypeString).
WithField("Field4", ListOf(TypeInteger)).
WithField("Field4", ListOf(TypeInt32)).
WithRecordField("Field5",
RecordTypeBegin().
WithField("Field6", TypeString).

4
weed/mq/schema/to_parquet_levels_test.go

@ -19,8 +19,8 @@ func TestToParquetLevels(t *testing.T) {
name: "nested type",
args: args{
RecordTypeBegin().
WithField("ID", TypeLong).
WithField("CreatedAt", TypeLong).
WithField("ID", Type64).
WithField("CreatedAt", Type64).
WithRecordField("Person",
RecordTypeBegin().
WithField("zName", TypeString).

16
weed/mq/schema/value_builder.go

@ -21,19 +21,19 @@ func (rvb *RecordValueBuilder) SetBool(key string, value bool) *RecordValueBuild
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value}}
return rvb
}
func (rvb *RecordValueBuilder) SetInt(key string, value int32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetInt32(key string, value int32) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value}}
return rvb
}
func (rvb *RecordValueBuilder) SetLong(key string, value int64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetInt64(key string, value int64) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value}}
return rvb
}
func (rvb *RecordValueBuilder) SetFloat(key string, value float32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloat32(key string, value float32) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value}}
return rvb
}
func (rvb *RecordValueBuilder) SetDouble(key string, value float64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloat64(key string, value float64) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value}}
return rvb
}
@ -62,28 +62,28 @@ func (rvb *RecordValueBuilder) SetBoolList(key string, values ...bool) *RecordVa
}
return rvb.addListValue(key, listValues)
}
func (rvb *RecordValueBuilder) SetIntList(key string, values ...int32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetInt32List(key string, values ...int32) *RecordValueBuilder {
var listValues []*schema_pb.Value
for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}})
}
return rvb.addListValue(key, listValues)
}
func (rvb *RecordValueBuilder) SetLongList(key string, values ...int64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetInt64List(key string, values ...int64) *RecordValueBuilder {
var listValues []*schema_pb.Value
for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}})
}
return rvb.addListValue(key, listValues)
}
func (rvb *RecordValueBuilder) SetFloatList(key string, values ...float32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloat32List(key string, values ...float32) *RecordValueBuilder {
var listValues []*schema_pb.Value
for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}})
}
return rvb.addListValue(key, listValues)
}
func (rvb *RecordValueBuilder) SetDoubleList(key string, values ...float64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloat64List(key string, values ...float64) *RecordValueBuilder {
var listValues []*schema_pb.Value
for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}})

8
weed/mq/schema/write_parquet_test.go

@ -13,8 +13,8 @@ import (
func TestWriteReadParquet(t *testing.T) {
// create a schema_pb.RecordType
recordType := RecordTypeBegin().
WithField("ID", TypeLong).
WithField("CreatedAt", TypeLong).
WithField("ID", Type64).
WithField("CreatedAt", Type64).
WithRecordField("Person",
RecordTypeBegin().
WithField("zName", TypeString).
@ -74,8 +74,8 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
rowBuilder.Reset()
// generate random data
recordValue := RecordBegin().
SetLong("ID", 1+int64(i)).
SetLong("CreatedAt", 2+2*int64(i)).
SetInt64("ID", 1+int64(i)).
SetInt64("CreatedAt", 2+2*int64(i)).
SetRecord("Person",
RecordBegin().
SetString("zName", fmt.Sprintf("john_%d", i)).

Loading…
Cancel
Save