Browse Source

rename

mq-subscribe
chrislu 8 months ago
parent
commit
7a9b115cc2
  1. 8
      weed/mq/client/cmd/weed_pub_record/publisher_record.go
  2. 6
      weed/mq/schema/schema_builder.go
  3. 4
      weed/mq/schema/struct_to_schema.go
  4. 4
      weed/mq/schema/to_parquet_schema.go
  5. 4
      weed/mq/schema/to_schema_value.go
  6. 8
      weed/mq/schema/value_builder.go
  7. 4
      weed/pb/schema.proto
  8. 50
      weed/pb/schema_pb/schema.pb.go

8
weed/mq/client/cmd/weed_pub_record/publisher_record.go

@ -75,8 +75,8 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
SetString("field2", r.Field2). SetString("field2", r.Field2).
SetInt32("field3", r.Field3). SetInt32("field3", r.Field3).
SetInt64("field4", r.Field4). SetInt64("field4", r.Field4).
SetFloat32("field5", r.Field5).
SetFloat64("field6", r.Field6).
SetFloat("field5", r.Field5).
SetDouble("field6", r.Field6).
SetBool("field7", r.Field7). SetBool("field7", r.Field7).
RecordEnd() RecordEnd()
} }
@ -90,8 +90,8 @@ func main() {
WithField("field2", schema.TypeString). WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInt32). WithField("field3", schema.TypeInt32).
WithField("field4", schema.TypeInt64). WithField("field4", schema.TypeInt64).
WithField("field5", schema.TypeFloat32).
WithField("field6", schema.TypeFloat64).
WithField("field5", schema.TypeFloat).
WithField("field6", schema.TypeDouble).
WithField("field7", schema.TypeBoolean). WithField("field7", schema.TypeBoolean).
RecordTypeEnd() RecordTypeEnd()

6
weed/mq/schema/schema_builder.go

@ -8,9 +8,9 @@ import (
var ( var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}} TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}} TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT32}}
TypeFloat64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT64}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}} TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}} TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
) )

4
weed/mq/schema/struct_to_schema.go

@ -23,9 +23,9 @@ func reflectTypeToSchemaType(t reflect.Type) *schema_pb.Type {
case reflect.Int64: case reflect.Int64:
return TypeInt64 return TypeInt64
case reflect.Float32: case reflect.Float32:
return TypeFloat32
return TypeFloat
case reflect.Float64: case reflect.Float64:
return TypeFloat64
return TypeDouble
case reflect.String: case reflect.String:
return TypeString return TypeString
case reflect.Slice: case reflect.Slice:

4
weed/mq/schema/to_parquet_schema.go

@ -51,9 +51,9 @@ func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, er
return parquet.Leaf(parquet.Int32Type), nil return parquet.Leaf(parquet.Int32Type), nil
case schema_pb.ScalarType_INT64: case schema_pb.ScalarType_INT64:
return parquet.Leaf(parquet.Int64Type), nil return parquet.Leaf(parquet.Int64Type), nil
case schema_pb.ScalarType_FLOAT32:
case schema_pb.ScalarType_FLOAT:
return parquet.Leaf(parquet.FloatType), nil return parquet.Leaf(parquet.FloatType), nil
case schema_pb.ScalarType_FLOAT64:
case schema_pb.ScalarType_DOUBLE:
return parquet.Leaf(parquet.DoubleType), nil return parquet.Leaf(parquet.DoubleType), nil
case schema_pb.ScalarType_BYTES: case schema_pb.ScalarType_BYTES:
return parquet.Leaf(parquet.ByteArrayType), nil return parquet.Leaf(parquet.ByteArrayType), nil

4
weed/mq/schema/to_schema_value.go

@ -72,9 +72,9 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil
case schema_pb.ScalarType_INT64: case schema_pb.ScalarType_INT64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil
case schema_pb.ScalarType_FLOAT32:
case schema_pb.ScalarType_FLOAT:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil
case schema_pb.ScalarType_FLOAT64:
case schema_pb.ScalarType_DOUBLE:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil
case schema_pb.ScalarType_BYTES: case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil

8
weed/mq/schema/value_builder.go

@ -29,11 +29,11 @@ func (rvb *RecordValueBuilder) SetInt64(key string, value int64) *RecordValueBui
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value}} rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value}}
return rvb return rvb
} }
func (rvb *RecordValueBuilder) SetFloat32(key string, value float32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloat(key string, value float32) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value}} rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value}}
return rvb return rvb
} }
func (rvb *RecordValueBuilder) SetFloat64(key string, value float64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetDouble(key string, value float64) *RecordValueBuilder {
rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value}} rvb.recordValue.Fields[key] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value}}
return rvb return rvb
} }
@ -76,14 +76,14 @@ func (rvb *RecordValueBuilder) SetInt64List(key string, values ...int64) *Record
} }
return rvb.addListValue(key, listValues) return rvb.addListValue(key, listValues)
} }
func (rvb *RecordValueBuilder) SetFloat32List(key string, values ...float32) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetFloatList(key string, values ...float32) *RecordValueBuilder {
var listValues []*schema_pb.Value var listValues []*schema_pb.Value
for _, v := range values { for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}}) listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}})
} }
return rvb.addListValue(key, listValues) return rvb.addListValue(key, listValues)
} }
func (rvb *RecordValueBuilder) SetFloat64List(key string, values ...float64) *RecordValueBuilder {
func (rvb *RecordValueBuilder) SetDoubleList(key string, values ...float64) *RecordValueBuilder {
var listValues []*schema_pb.Value var listValues []*schema_pb.Value
for _, v := range values { for _, v := range values {
listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}) listValues = append(listValues, &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}})

4
weed/pb/schema.proto

@ -32,8 +32,8 @@ enum ScalarType {
BOOL = 0; BOOL = 0;
INT32 = 1; INT32 = 1;
INT64 = 3; INT64 = 3;
FLOAT32 = 4;
FLOAT64 = 5;
FLOAT = 4;
DOUBLE = 5;
BYTES = 6; BYTES = 6;
STRING = 7; STRING = 7;
} }

50
weed/pb/schema_pb/schema.pb.go

@ -23,13 +23,13 @@ const (
type ScalarType int32 type ScalarType int32
const ( const (
ScalarType_BOOL ScalarType = 0
ScalarType_INT32 ScalarType = 1
ScalarType_INT64 ScalarType = 3
ScalarType_FLOAT32 ScalarType = 4
ScalarType_FLOAT64 ScalarType = 5
ScalarType_BYTES ScalarType = 6
ScalarType_STRING ScalarType = 7
ScalarType_BOOL ScalarType = 0
ScalarType_INT32 ScalarType = 1
ScalarType_INT64 ScalarType = 3
ScalarType_FLOAT ScalarType = 4
ScalarType_DOUBLE ScalarType = 5
ScalarType_BYTES ScalarType = 6
ScalarType_STRING ScalarType = 7
) )
// Enum value maps for ScalarType. // Enum value maps for ScalarType.
@ -38,19 +38,19 @@ var (
0: "BOOL", 0: "BOOL",
1: "INT32", 1: "INT32",
3: "INT64", 3: "INT64",
4: "FLOAT32",
5: "FLOAT64",
4: "FLOAT",
5: "DOUBLE",
6: "BYTES", 6: "BYTES",
7: "STRING", 7: "STRING",
} }
ScalarType_value = map[string]int32{ ScalarType_value = map[string]int32{
"BOOL": 0,
"INT32": 1,
"INT64": 3,
"FLOAT32": 4,
"FLOAT64": 5,
"BYTES": 6,
"STRING": 7,
"BOOL": 0,
"INT32": 1,
"INT64": 3,
"FLOAT": 4,
"DOUBLE": 5,
"BYTES": 6,
"STRING": 7,
} }
) )
@ -695,17 +695,17 @@ var file_schema_proto_rawDesc = []byte{
0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x35, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x35, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61,
0x6c, 0x75, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x6c, 0x75, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2a, 0x5d, 0x0a,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2a, 0x5a, 0x0a,
0x0a, 0x53, 0x63, 0x61, 0x6c, 0x61, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x42, 0x0a, 0x53, 0x63, 0x61, 0x6c, 0x61, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x42,
0x4f, 0x4f, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x01, 0x4f, 0x4f, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x01,
0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x46,
0x4c, 0x4f, 0x41, 0x54, 0x33, 0x32, 0x10, 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x4c, 0x4f, 0x41,
0x54, 0x36, 0x34, 0x10, 0x05, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x06,
0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x32, 0x5a, 0x30,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65,
0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77,
0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x12, 0x09, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, 0x46,
0x4c, 0x4f, 0x41, 0x54, 0x10, 0x04, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x4f, 0x55, 0x42, 0x4c, 0x45,
0x10, 0x05, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x06, 0x12, 0x0a, 0x0a,
0x06, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66,
0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64,
0x2f, 0x70, 0x62, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

Loading…
Cancel
Save