Browse Source

rename field types

mq-subscribe
chrislu 8 months ago
parent
commit
f2f4630029
  1. 2
      weed/mq/client/cmd/weed_pub_record/publisher_record.go
  2. 10
      weed/mq/schema/schema_builder.go
  3. 2
      weed/mq/schema/schema_test.go
  4. 2
      weed/mq/schema/struct_to_schema.go
  5. 4
      weed/mq/schema/to_parquet_levels_test.go
  6. 10
      weed/mq/schema/to_parquet_schema.go
  7. 10
      weed/mq/schema/to_schema_value.go
  8. 4
      weed/mq/schema/write_parquet_test.go
  9. 10
      weed/pb/schema.proto
  10. 54
      weed/pb/schema_pb/schema.pb.go

2
weed/mq/client/cmd/weed_pub_record/publisher_record.go

@ -89,7 +89,7 @@ func main() {
WithField("field1", schema.TypeBytes). WithField("field1", schema.TypeBytes).
WithField("field2", schema.TypeString). WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInt32). WithField("field3", schema.TypeInt32).
WithField("field4", schema.Type64).
WithField("field4", schema.TypeInt64).
WithField("field5", schema.TypeFloat32). WithField("field5", schema.TypeFloat32).
WithField("field6", schema.TypeFloat64). WithField("field6", schema.TypeFloat64).
WithField("field7", schema.TypeBoolean). WithField("field7", schema.TypeBoolean).

10
weed/mq/schema/schema_builder.go

@ -6,11 +6,11 @@ import (
) )
var ( var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOLEAN}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INTEGER}}
Type64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_LONG}}
TypeFloat32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeFloat64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT32}}
TypeFloat64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT64}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}} TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}} TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
) )

2
weed/mq/schema/schema_test.go

@ -14,7 +14,7 @@ func TestEnumScalarType(t *testing.T) {
enum ScalarType enum ScalarType
expected int32 expected int32
}{ }{
{"Boolean", ScalarType_BOOLEAN, 0},
{"Boolean", ScalarType_BOOL, 0},
{"Integer", ScalarType_INTEGER, 1}, {"Integer", ScalarType_INTEGER, 1},
{"Long", ScalarType_LONG, 3}, {"Long", ScalarType_LONG, 3},
{"Float", ScalarType_FLOAT, 4}, {"Float", ScalarType_FLOAT, 4},

2
weed/mq/schema/struct_to_schema.go

@ -21,7 +21,7 @@ func reflectTypeToSchemaType(t reflect.Type) *schema_pb.Type {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32: case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32:
return TypeInt32 return TypeInt32
case reflect.Int64: case reflect.Int64:
return Type64
return TypeInt64
case reflect.Float32: case reflect.Float32:
return TypeFloat32 return TypeFloat32
case reflect.Float64: case reflect.Float64:

4
weed/mq/schema/to_parquet_levels_test.go

@ -19,8 +19,8 @@ func TestToParquetLevels(t *testing.T) {
name: "nested type", name: "nested type",
args: args{ args: args{
RecordTypeBegin(). RecordTypeBegin().
WithField("ID", Type64).
WithField("CreatedAt", Type64).
WithField("ID", TypeInt64).
WithField("CreatedAt", TypeInt64).
WithRecordField("Person", WithRecordField("Person",
RecordTypeBegin(). RecordTypeBegin().
WithField("zName", TypeString). WithField("zName", TypeString).

10
weed/mq/schema/to_parquet_schema.go

@ -45,15 +45,15 @@ func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error)
func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) { func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
switch scalarType { switch scalarType {
case schema_pb.ScalarType_BOOLEAN:
case schema_pb.ScalarType_BOOL:
return parquet.Leaf(parquet.BooleanType), nil return parquet.Leaf(parquet.BooleanType), nil
case schema_pb.ScalarType_INTEGER:
case schema_pb.ScalarType_INT32:
return parquet.Leaf(parquet.Int32Type), nil return parquet.Leaf(parquet.Int32Type), nil
case schema_pb.ScalarType_LONG:
case schema_pb.ScalarType_INT64:
return parquet.Leaf(parquet.Int64Type), nil return parquet.Leaf(parquet.Int64Type), nil
case schema_pb.ScalarType_FLOAT:
case schema_pb.ScalarType_FLOAT32:
return parquet.Leaf(parquet.FloatType), nil return parquet.Leaf(parquet.FloatType), nil
case schema_pb.ScalarType_DOUBLE:
case schema_pb.ScalarType_FLOAT64:
return parquet.Leaf(parquet.DoubleType), nil return parquet.Leaf(parquet.DoubleType), nil
case schema_pb.ScalarType_BYTES: case schema_pb.ScalarType_BYTES:
return parquet.Leaf(parquet.ByteArrayType), nil return parquet.Leaf(parquet.ByteArrayType), nil

10
weed/mq/schema/to_schema_value.go

@ -66,15 +66,15 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
return nil, valueIndex, nil return nil, valueIndex, nil
} }
switch scalarType { switch scalarType {
case schema_pb.ScalarType_BOOLEAN:
case schema_pb.ScalarType_BOOL:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex+1, nil
case schema_pb.ScalarType_INTEGER:
case schema_pb.ScalarType_INT32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil
case schema_pb.ScalarType_LONG:
case schema_pb.ScalarType_INT64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil
case schema_pb.ScalarType_FLOAT:
case schema_pb.ScalarType_FLOAT32:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil
case schema_pb.ScalarType_DOUBLE:
case schema_pb.ScalarType_FLOAT64:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil
case schema_pb.ScalarType_BYTES: case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil

4
weed/mq/schema/write_parquet_test.go

@ -13,8 +13,8 @@ import (
func TestWriteReadParquet(t *testing.T) { func TestWriteReadParquet(t *testing.T) {
// create a schema_pb.RecordType // create a schema_pb.RecordType
recordType := RecordTypeBegin(). recordType := RecordTypeBegin().
WithField("ID", Type64).
WithField("CreatedAt", Type64).
WithField("ID", TypeInt64).
WithField("CreatedAt", TypeInt64).
WithRecordField("Person", WithRecordField("Person",
RecordTypeBegin(). RecordTypeBegin().
WithField("zName", TypeString). WithField("zName", TypeString).

10
weed/pb/schema.proto

@ -29,11 +29,11 @@ message Type {
} }
enum ScalarType { enum ScalarType {
BOOLEAN = 0;
INTEGER = 1;
LONG = 3;
FLOAT = 4;
DOUBLE = 5;
BOOL = 0;
INT32 = 1;
INT64 = 3;
FLOAT32 = 4;
FLOAT64 = 5;
BYTES = 6; BYTES = 6;
STRING = 7; STRING = 7;
} }

54
weed/pb/schema_pb/schema.pb.go

@ -23,11 +23,11 @@ const (
type ScalarType int32 type ScalarType int32
const ( const (
ScalarType_BOOLEAN ScalarType = 0
ScalarType_INTEGER ScalarType = 1
ScalarType_LONG ScalarType = 3
ScalarType_FLOAT ScalarType = 4
ScalarType_DOUBLE ScalarType = 5
ScalarType_BOOL ScalarType = 0
ScalarType_INT32 ScalarType = 1
ScalarType_INT64 ScalarType = 3
ScalarType_FLOAT32 ScalarType = 4
ScalarType_FLOAT64 ScalarType = 5
ScalarType_BYTES ScalarType = 6 ScalarType_BYTES ScalarType = 6
ScalarType_STRING ScalarType = 7 ScalarType_STRING ScalarType = 7
) )
@ -35,20 +35,20 @@ const (
// Enum value maps for ScalarType. // Enum value maps for ScalarType.
var ( var (
ScalarType_name = map[int32]string{ ScalarType_name = map[int32]string{
0: "BOOLEAN",
1: "INTEGER",
3: "LONG",
4: "FLOAT",
5: "DOUBLE",
0: "BOOL",
1: "INT32",
3: "INT64",
4: "FLOAT32",
5: "FLOAT64",
6: "BYTES", 6: "BYTES",
7: "STRING", 7: "STRING",
} }
ScalarType_value = map[string]int32{ ScalarType_value = map[string]int32{
"BOOLEAN": 0,
"INTEGER": 1,
"LONG": 3,
"FLOAT": 4,
"DOUBLE": 5,
"BOOL": 0,
"INT32": 1,
"INT64": 3,
"FLOAT32": 4,
"FLOAT64": 5,
"BYTES": 6, "BYTES": 6,
"STRING": 7, "STRING": 7,
} }
@ -263,7 +263,7 @@ func (x *Type) GetScalarType() ScalarType {
if x, ok := x.GetKind().(*Type_ScalarType); ok { if x, ok := x.GetKind().(*Type_ScalarType); ok {
return x.ScalarType return x.ScalarType
} }
return ScalarType_BOOLEAN
return ScalarType_BOOL
} }
func (x *Type) GetRecordType() *RecordType { func (x *Type) GetRecordType() *RecordType {
@ -695,17 +695,17 @@ var file_schema_proto_rawDesc = []byte{
0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x35, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x35, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61,
0x6c, 0x75, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x6c, 0x75, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2a, 0x5e, 0x0a,
0x0a, 0x53, 0x63, 0x61, 0x6c, 0x61, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x42,
0x4f, 0x4f, 0x4c, 0x45, 0x41, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x54, 0x45,
0x47, 0x45, 0x52, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0x03, 0x12,
0x09, 0x0a, 0x05, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x10, 0x04, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x4f,
0x55, 0x42, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10,
0x06, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x32, 0x5a,
0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77,
0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f,
0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70,
0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2a, 0x5d, 0x0a,
0x0a, 0x53, 0x63, 0x61, 0x6c, 0x61, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x42,
0x4f, 0x4f, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x01,
0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x46,
0x4c, 0x4f, 0x41, 0x54, 0x33, 0x32, 0x10, 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x4c, 0x4f, 0x41,
0x54, 0x36, 0x34, 0x10, 0x05, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x06,
0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x32, 0x5a, 0x30,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65,
0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77,
0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

Loading…
Cancel
Save