diff --git a/weed/pb/mq_schema.proto b/weed/pb/mq_schema.proto index e2196c5fc..2deeadb55 100644 --- a/weed/pb/mq_schema.proto +++ b/weed/pb/mq_schema.proto @@ -69,6 +69,11 @@ enum ScalarType { DOUBLE = 5; BYTES = 6; STRING = 7; + // Parquet logical types for analytics + TIMESTAMP = 8; // UTC timestamp (microseconds since epoch) + DATE = 9; // Date (days since epoch) + DECIMAL = 10; // Arbitrary precision decimal + TIME = 11; // Time of day (microseconds) } message ListType { @@ -90,10 +95,36 @@ message Value { double double_value = 5; bytes bytes_value = 6; string string_value = 7; + // Parquet logical type values + TimestampValue timestamp_value = 8; + DateValue date_value = 9; + DecimalValue decimal_value = 10; + TimeValue time_value = 11; + // Complex types ListValue list_value = 14; RecordValue record_value = 15; } } +// Parquet logical type value messages +message TimestampValue { + int64 timestamp_micros = 1; // Microseconds since Unix epoch (UTC) + bool is_utc = 2; // True if UTC, false if local time +} + +message DateValue { + int32 days_since_epoch = 1; // Days since Unix epoch (1970-01-01) +} + +message DecimalValue { + bytes value = 1; // Arbitrary precision decimal as bytes + int32 precision = 2; // Total number of digits + int32 scale = 3; // Number of digits after decimal point +} + +message TimeValue { + int64 time_micros = 1; // Microseconds since midnight +} + message ListValue { repeated Value values = 1; } diff --git a/weed/pb/schema_pb/mq_schema.pb.go b/weed/pb/schema_pb/mq_schema.pb.go index 08ce2ba6c..2cd2118bf 100644 --- a/weed/pb/schema_pb/mq_schema.pb.go +++ b/weed/pb/schema_pb/mq_schema.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.6 // protoc v5.29.3 -// source: mq_schema.proto +// source: weed/pb/mq_schema.proto package schema_pb @@ -60,11 +60,11 @@ func (x OffsetType) String() string { } func (OffsetType) Descriptor() protoreflect.EnumDescriptor { - return file_mq_schema_proto_enumTypes[0].Descriptor() + return file_weed_pb_mq_schema_proto_enumTypes[0].Descriptor() } func (OffsetType) Type() protoreflect.EnumType { - return &file_mq_schema_proto_enumTypes[0] + return &file_weed_pb_mq_schema_proto_enumTypes[0] } func (x OffsetType) Number() protoreflect.EnumNumber { @@ -73,7 +73,7 @@ func (x OffsetType) Number() protoreflect.EnumNumber { // Deprecated: Use OffsetType.Descriptor instead. func (OffsetType) EnumDescriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{0} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} } type ScalarType int32 @@ -86,27 +86,40 @@ const ( ScalarType_DOUBLE ScalarType = 5 ScalarType_BYTES ScalarType = 6 ScalarType_STRING ScalarType = 7 + // Parquet logical types for analytics + ScalarType_TIMESTAMP ScalarType = 8 // UTC timestamp (microseconds since epoch) + ScalarType_DATE ScalarType = 9 // Date (days since epoch) + ScalarType_DECIMAL ScalarType = 10 // Arbitrary precision decimal + ScalarType_TIME ScalarType = 11 // Time of day (microseconds) ) // Enum value maps for ScalarType. var ( ScalarType_name = map[int32]string{ - 0: "BOOL", - 1: "INT32", - 3: "INT64", - 4: "FLOAT", - 5: "DOUBLE", - 6: "BYTES", - 7: "STRING", + 0: "BOOL", + 1: "INT32", + 3: "INT64", + 4: "FLOAT", + 5: "DOUBLE", + 6: "BYTES", + 7: "STRING", + 8: "TIMESTAMP", + 9: "DATE", + 10: "DECIMAL", + 11: "TIME", } ScalarType_value = map[string]int32{ - "BOOL": 0, - "INT32": 1, - "INT64": 3, - "FLOAT": 4, - "DOUBLE": 5, - "BYTES": 6, - "STRING": 7, + "BOOL": 0, + "INT32": 1, + "INT64": 3, + "FLOAT": 4, + "DOUBLE": 5, + "BYTES": 6, + "STRING": 7, + "TIMESTAMP": 8, + "DATE": 9, + "DECIMAL": 10, + "TIME": 11, } ) @@ -121,11 +134,11 @@ func (x ScalarType) String() string { } func (ScalarType) Descriptor() protoreflect.EnumDescriptor { - return file_mq_schema_proto_enumTypes[1].Descriptor() + return file_weed_pb_mq_schema_proto_enumTypes[1].Descriptor() } func (ScalarType) Type() protoreflect.EnumType { - return &file_mq_schema_proto_enumTypes[1] + return &file_weed_pb_mq_schema_proto_enumTypes[1] } func (x ScalarType) Number() protoreflect.EnumNumber { @@ -134,7 +147,7 @@ func (x ScalarType) Number() protoreflect.EnumNumber { // Deprecated: Use ScalarType.Descriptor instead. func (ScalarType) EnumDescriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{1} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} } type Topic struct { @@ -147,7 +160,7 @@ type Topic struct { func (x *Topic) Reset() { *x = Topic{} - mi := &file_mq_schema_proto_msgTypes[0] + mi := &file_weed_pb_mq_schema_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -159,7 +172,7 @@ func (x *Topic) String() string { func (*Topic) ProtoMessage() {} func (x *Topic) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[0] + mi := &file_weed_pb_mq_schema_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -172,7 +185,7 @@ func (x *Topic) ProtoReflect() protoreflect.Message { // Deprecated: Use Topic.ProtoReflect.Descriptor instead. func (*Topic) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{0} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} } func (x *Topic) GetNamespace() string { @@ -201,7 +214,7 @@ type Partition struct { func (x *Partition) Reset() { *x = Partition{} - mi := &file_mq_schema_proto_msgTypes[1] + mi := &file_weed_pb_mq_schema_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -213,7 +226,7 @@ func (x *Partition) String() string { func (*Partition) ProtoMessage() {} func (x *Partition) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[1] + mi := &file_weed_pb_mq_schema_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -226,7 +239,7 @@ func (x *Partition) ProtoReflect() protoreflect.Message { // Deprecated: Use Partition.ProtoReflect.Descriptor instead. func (*Partition) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{1} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} } func (x *Partition) GetRingSize() int32 { @@ -267,7 +280,7 @@ type Offset struct { func (x *Offset) Reset() { *x = Offset{} - mi := &file_mq_schema_proto_msgTypes[2] + mi := &file_weed_pb_mq_schema_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -279,7 +292,7 @@ func (x *Offset) String() string { func (*Offset) ProtoMessage() {} func (x *Offset) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[2] + mi := &file_weed_pb_mq_schema_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -292,7 +305,7 @@ func (x *Offset) ProtoReflect() protoreflect.Message { // Deprecated: Use Offset.ProtoReflect.Descriptor instead. func (*Offset) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{2} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{2} } func (x *Offset) GetTopic() *Topic { @@ -319,7 +332,7 @@ type PartitionOffset struct { func (x *PartitionOffset) Reset() { *x = PartitionOffset{} - mi := &file_mq_schema_proto_msgTypes[3] + mi := &file_weed_pb_mq_schema_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -331,7 +344,7 @@ func (x *PartitionOffset) String() string { func (*PartitionOffset) ProtoMessage() {} func (x *PartitionOffset) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[3] + mi := &file_weed_pb_mq_schema_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -344,7 +357,7 @@ func (x *PartitionOffset) ProtoReflect() protoreflect.Message { // Deprecated: Use PartitionOffset.ProtoReflect.Descriptor instead. func (*PartitionOffset) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{3} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{3} } func (x *PartitionOffset) GetPartition() *Partition { @@ -370,7 +383,7 @@ type RecordType struct { func (x *RecordType) Reset() { *x = RecordType{} - mi := &file_mq_schema_proto_msgTypes[4] + mi := &file_weed_pb_mq_schema_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -382,7 +395,7 @@ func (x *RecordType) String() string { func (*RecordType) ProtoMessage() {} func (x *RecordType) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[4] + mi := &file_weed_pb_mq_schema_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -395,7 +408,7 @@ func (x *RecordType) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordType.ProtoReflect.Descriptor instead. func (*RecordType) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{4} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{4} } func (x *RecordType) GetFields() []*Field { @@ -418,7 +431,7 @@ type Field struct { func (x *Field) Reset() { *x = Field{} - mi := &file_mq_schema_proto_msgTypes[5] + mi := &file_weed_pb_mq_schema_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -430,7 +443,7 @@ func (x *Field) String() string { func (*Field) ProtoMessage() {} func (x *Field) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[5] + mi := &file_weed_pb_mq_schema_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -443,7 +456,7 @@ func (x *Field) ProtoReflect() protoreflect.Message { // Deprecated: Use Field.ProtoReflect.Descriptor instead. func (*Field) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{5} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{5} } func (x *Field) GetName() string { @@ -495,7 +508,7 @@ type Type struct { func (x *Type) Reset() { *x = Type{} - mi := &file_mq_schema_proto_msgTypes[6] + mi := &file_weed_pb_mq_schema_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -507,7 +520,7 @@ func (x *Type) String() string { func (*Type) ProtoMessage() {} func (x *Type) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[6] + mi := &file_weed_pb_mq_schema_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -520,7 +533,7 @@ func (x *Type) ProtoReflect() protoreflect.Message { // Deprecated: Use Type.ProtoReflect.Descriptor instead. func (*Type) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{6} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{6} } func (x *Type) GetKind() isType_Kind { @@ -588,7 +601,7 @@ type ListType struct { func (x *ListType) Reset() { *x = ListType{} - mi := &file_mq_schema_proto_msgTypes[7] + mi := &file_weed_pb_mq_schema_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -600,7 +613,7 @@ func (x *ListType) String() string { func (*ListType) ProtoMessage() {} func (x *ListType) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[7] + mi := &file_weed_pb_mq_schema_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -613,7 +626,7 @@ func (x *ListType) ProtoReflect() protoreflect.Message { // Deprecated: Use ListType.ProtoReflect.Descriptor instead. func (*ListType) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{7} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{7} } func (x *ListType) GetElementType() *Type { @@ -635,7 +648,7 @@ type RecordValue struct { func (x *RecordValue) Reset() { *x = RecordValue{} - mi := &file_mq_schema_proto_msgTypes[8] + mi := &file_weed_pb_mq_schema_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -647,7 +660,7 @@ func (x *RecordValue) String() string { func (*RecordValue) ProtoMessage() {} func (x *RecordValue) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[8] + mi := &file_weed_pb_mq_schema_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -660,7 +673,7 @@ func (x *RecordValue) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordValue.ProtoReflect.Descriptor instead. func (*RecordValue) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{8} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{8} } func (x *RecordValue) GetFields() map[string]*Value { @@ -681,6 +694,10 @@ type Value struct { // *Value_DoubleValue // *Value_BytesValue // *Value_StringValue + // *Value_TimestampValue + // *Value_DateValue + // *Value_DecimalValue + // *Value_TimeValue // *Value_ListValue // *Value_RecordValue Kind isValue_Kind `protobuf_oneof:"kind"` @@ -690,7 +707,7 @@ type Value struct { func (x *Value) Reset() { *x = Value{} - mi := &file_mq_schema_proto_msgTypes[9] + mi := &file_weed_pb_mq_schema_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -702,7 +719,7 @@ func (x *Value) String() string { func (*Value) ProtoMessage() {} func (x *Value) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[9] + mi := &file_weed_pb_mq_schema_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -715,7 +732,7 @@ func (x *Value) ProtoReflect() protoreflect.Message { // Deprecated: Use Value.ProtoReflect.Descriptor instead. func (*Value) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{9} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{9} } func (x *Value) GetKind() isValue_Kind { @@ -788,6 +805,42 @@ func (x *Value) GetStringValue() string { return "" } +func (x *Value) GetTimestampValue() *TimestampValue { + if x != nil { + if x, ok := x.Kind.(*Value_TimestampValue); ok { + return x.TimestampValue + } + } + return nil +} + +func (x *Value) GetDateValue() *DateValue { + if x != nil { + if x, ok := x.Kind.(*Value_DateValue); ok { + return x.DateValue + } + } + return nil +} + +func (x *Value) GetDecimalValue() *DecimalValue { + if x != nil { + if x, ok := x.Kind.(*Value_DecimalValue); ok { + return x.DecimalValue + } + } + return nil +} + +func (x *Value) GetTimeValue() *TimeValue { + if x != nil { + if x, ok := x.Kind.(*Value_TimeValue); ok { + return x.TimeValue + } + } + return nil +} + func (x *Value) GetListValue() *ListValue { if x != nil { if x, ok := x.Kind.(*Value_ListValue); ok { @@ -838,7 +891,25 @@ type Value_StringValue struct { StringValue string `protobuf:"bytes,7,opt,name=string_value,json=stringValue,proto3,oneof"` } +type Value_TimestampValue struct { + // Parquet logical type values + TimestampValue *TimestampValue `protobuf:"bytes,8,opt,name=timestamp_value,json=timestampValue,proto3,oneof"` +} + +type Value_DateValue struct { + DateValue *DateValue `protobuf:"bytes,9,opt,name=date_value,json=dateValue,proto3,oneof"` +} + +type Value_DecimalValue struct { + DecimalValue *DecimalValue `protobuf:"bytes,10,opt,name=decimal_value,json=decimalValue,proto3,oneof"` +} + +type Value_TimeValue struct { + TimeValue *TimeValue `protobuf:"bytes,11,opt,name=time_value,json=timeValue,proto3,oneof"` +} + type Value_ListValue struct { + // Complex types ListValue *ListValue `protobuf:"bytes,14,opt,name=list_value,json=listValue,proto3,oneof"` } @@ -860,10 +931,219 @@ func (*Value_BytesValue) isValue_Kind() {} func (*Value_StringValue) isValue_Kind() {} +func (*Value_TimestampValue) isValue_Kind() {} + +func (*Value_DateValue) isValue_Kind() {} + +func (*Value_DecimalValue) isValue_Kind() {} + +func (*Value_TimeValue) isValue_Kind() {} + func (*Value_ListValue) isValue_Kind() {} func (*Value_RecordValue) isValue_Kind() {} +// Parquet logical type value messages +type TimestampValue struct { + state protoimpl.MessageState `protogen:"open.v1"` + TimestampMicros int64 `protobuf:"varint,1,opt,name=timestamp_micros,json=timestampMicros,proto3" json:"timestamp_micros,omitempty"` // Microseconds since Unix epoch (UTC) + IsUtc bool `protobuf:"varint,2,opt,name=is_utc,json=isUtc,proto3" json:"is_utc,omitempty"` // True if UTC, false if local time + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TimestampValue) Reset() { + *x = TimestampValue{} + mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TimestampValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TimestampValue) ProtoMessage() {} + +func (x *TimestampValue) ProtoReflect() protoreflect.Message { + mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TimestampValue.ProtoReflect.Descriptor instead. +func (*TimestampValue) Descriptor() ([]byte, []int) { + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{10} +} + +func (x *TimestampValue) GetTimestampMicros() int64 { + if x != nil { + return x.TimestampMicros + } + return 0 +} + +func (x *TimestampValue) GetIsUtc() bool { + if x != nil { + return x.IsUtc + } + return false +} + +type DateValue struct { + state protoimpl.MessageState `protogen:"open.v1"` + DaysSinceEpoch int32 `protobuf:"varint,1,opt,name=days_since_epoch,json=daysSinceEpoch,proto3" json:"days_since_epoch,omitempty"` // Days since Unix epoch (1970-01-01) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DateValue) Reset() { + *x = DateValue{} + mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DateValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DateValue) ProtoMessage() {} + +func (x *DateValue) ProtoReflect() protoreflect.Message { + mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DateValue.ProtoReflect.Descriptor instead. +func (*DateValue) Descriptor() ([]byte, []int) { + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{11} +} + +func (x *DateValue) GetDaysSinceEpoch() int32 { + if x != nil { + return x.DaysSinceEpoch + } + return 0 +} + +type DecimalValue struct { + state protoimpl.MessageState `protogen:"open.v1"` + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` // Arbitrary precision decimal as bytes + Precision int32 `protobuf:"varint,2,opt,name=precision,proto3" json:"precision,omitempty"` // Total number of digits + Scale int32 `protobuf:"varint,3,opt,name=scale,proto3" json:"scale,omitempty"` // Number of digits after decimal point + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DecimalValue) Reset() { + *x = DecimalValue{} + mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DecimalValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DecimalValue) ProtoMessage() {} + +func (x *DecimalValue) ProtoReflect() protoreflect.Message { + mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DecimalValue.ProtoReflect.Descriptor instead. +func (*DecimalValue) Descriptor() ([]byte, []int) { + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{12} +} + +func (x *DecimalValue) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *DecimalValue) GetPrecision() int32 { + if x != nil { + return x.Precision + } + return 0 +} + +func (x *DecimalValue) GetScale() int32 { + if x != nil { + return x.Scale + } + return 0 +} + +type TimeValue struct { + state protoimpl.MessageState `protogen:"open.v1"` + TimeMicros int64 `protobuf:"varint,1,opt,name=time_micros,json=timeMicros,proto3" json:"time_micros,omitempty"` // Microseconds since midnight + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TimeValue) Reset() { + *x = TimeValue{} + mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TimeValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TimeValue) ProtoMessage() {} + +func (x *TimeValue) ProtoReflect() protoreflect.Message { + mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TimeValue.ProtoReflect.Descriptor instead. +func (*TimeValue) Descriptor() ([]byte, []int) { + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{13} +} + +func (x *TimeValue) GetTimeMicros() int64 { + if x != nil { + return x.TimeMicros + } + return 0 +} + type ListValue struct { state protoimpl.MessageState `protogen:"open.v1"` Values []*Value `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` @@ -873,7 +1153,7 @@ type ListValue struct { func (x *ListValue) Reset() { *x = ListValue{} - mi := &file_mq_schema_proto_msgTypes[10] + mi := &file_weed_pb_mq_schema_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -885,7 +1165,7 @@ func (x *ListValue) String() string { func (*ListValue) ProtoMessage() {} func (x *ListValue) ProtoReflect() protoreflect.Message { - mi := &file_mq_schema_proto_msgTypes[10] + mi := &file_weed_pb_mq_schema_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -898,7 +1178,7 @@ func (x *ListValue) ProtoReflect() protoreflect.Message { // Deprecated: Use ListValue.ProtoReflect.Descriptor instead. func (*ListValue) Descriptor() ([]byte, []int) { - return file_mq_schema_proto_rawDescGZIP(), []int{10} + return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{14} } func (x *ListValue) GetValues() []*Value { @@ -908,11 +1188,11 @@ func (x *ListValue) GetValues() []*Value { return nil } -var File_mq_schema_proto protoreflect.FileDescriptor +var File_weed_pb_mq_schema_proto protoreflect.FileDescriptor -const file_mq_schema_proto_rawDesc = "" + +const file_weed_pb_mq_schema_proto_rawDesc = "" + "\n" + - "\x0fmq_schema.proto\x12\tschema_pb\"9\n" + + "\x17weed/pb/mq_schema.proto\x12\tschema_pb\"9\n" + "\x05Topic\x12\x1c\n" + "\tnamespace\x18\x01 \x01(\tR\tnamespace\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\"\x8a\x01\n" + @@ -955,7 +1235,7 @@ const file_mq_schema_proto_rawDesc = "" + "\x06fields\x18\x01 \x03(\v2\".schema_pb.RecordValue.FieldsEntryR\x06fields\x1aK\n" + "\vFieldsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12&\n" + - "\x05value\x18\x02 \x01(\v2\x10.schema_pb.ValueR\x05value:\x028\x01\"\xfa\x02\n" + + "\x05value\x18\x02 \x01(\v2\x10.schema_pb.ValueR\x05value:\x028\x01\"\xee\x04\n" + "\x05Value\x12\x1f\n" + "\n" + "bool_value\x18\x01 \x01(\bH\x00R\tboolValue\x12!\n" + @@ -968,11 +1248,30 @@ const file_mq_schema_proto_rawDesc = "" + "\fdouble_value\x18\x05 \x01(\x01H\x00R\vdoubleValue\x12!\n" + "\vbytes_value\x18\x06 \x01(\fH\x00R\n" + "bytesValue\x12#\n" + - "\fstring_value\x18\a \x01(\tH\x00R\vstringValue\x125\n" + + "\fstring_value\x18\a \x01(\tH\x00R\vstringValue\x12D\n" + + "\x0ftimestamp_value\x18\b \x01(\v2\x19.schema_pb.TimestampValueH\x00R\x0etimestampValue\x125\n" + + "\n" + + "date_value\x18\t \x01(\v2\x14.schema_pb.DateValueH\x00R\tdateValue\x12>\n" + + "\rdecimal_value\x18\n" + + " \x01(\v2\x17.schema_pb.DecimalValueH\x00R\fdecimalValue\x125\n" + + "\n" + + "time_value\x18\v \x01(\v2\x14.schema_pb.TimeValueH\x00R\ttimeValue\x125\n" + "\n" + "list_value\x18\x0e \x01(\v2\x14.schema_pb.ListValueH\x00R\tlistValue\x12;\n" + "\frecord_value\x18\x0f \x01(\v2\x16.schema_pb.RecordValueH\x00R\vrecordValueB\x06\n" + - "\x04kind\"5\n" + + "\x04kind\"R\n" + + "\x0eTimestampValue\x12)\n" + + "\x10timestamp_micros\x18\x01 \x01(\x03R\x0ftimestampMicros\x12\x15\n" + + "\x06is_utc\x18\x02 \x01(\bR\x05isUtc\"5\n" + + "\tDateValue\x12(\n" + + "\x10days_since_epoch\x18\x01 \x01(\x05R\x0edaysSinceEpoch\"X\n" + + "\fDecimalValue\x12\x14\n" + + "\x05value\x18\x01 \x01(\fR\x05value\x12\x1c\n" + + "\tprecision\x18\x02 \x01(\x05R\tprecision\x12\x14\n" + + "\x05scale\x18\x03 \x01(\x05R\x05scale\",\n" + + "\tTimeValue\x12\x1f\n" + + "\vtime_micros\x18\x01 \x01(\x03R\n" + + "timeMicros\"5\n" + "\tListValue\x12(\n" + "\x06values\x18\x01 \x03(\v2\x10.schema_pb.ValueR\x06values*w\n" + "\n" + @@ -982,7 +1281,7 @@ const file_mq_schema_proto_rawDesc = "" + "\vEXACT_TS_NS\x10\n" + "\x12\x13\n" + "\x0fRESET_TO_LATEST\x10\x0f\x12\x14\n" + - "\x10RESUME_OR_LATEST\x10\x14*Z\n" + + "\x10RESUME_OR_LATEST\x10\x14*\x8a\x01\n" + "\n" + "ScalarType\x12\b\n" + "\x04BOOL\x10\x00\x12\t\n" + @@ -993,23 +1292,28 @@ const file_mq_schema_proto_rawDesc = "" + "\x06DOUBLE\x10\x05\x12\t\n" + "\x05BYTES\x10\x06\x12\n" + "\n" + - "\x06STRING\x10\aB2Z0github.com/seaweedfs/seaweedfs/weed/pb/schema_pbb\x06proto3" + "\x06STRING\x10\a\x12\r\n" + + "\tTIMESTAMP\x10\b\x12\b\n" + + "\x04DATE\x10\t\x12\v\n" + + "\aDECIMAL\x10\n" + + "\x12\b\n" + + "\x04TIME\x10\vB2Z0github.com/seaweedfs/seaweedfs/weed/pb/schema_pbb\x06proto3" var ( - file_mq_schema_proto_rawDescOnce sync.Once - file_mq_schema_proto_rawDescData []byte + file_weed_pb_mq_schema_proto_rawDescOnce sync.Once + file_weed_pb_mq_schema_proto_rawDescData []byte ) -func file_mq_schema_proto_rawDescGZIP() []byte { - file_mq_schema_proto_rawDescOnce.Do(func() { - file_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc))) +func file_weed_pb_mq_schema_proto_rawDescGZIP() []byte { + file_weed_pb_mq_schema_proto_rawDescOnce.Do(func() { + file_weed_pb_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc))) }) - return file_mq_schema_proto_rawDescData + return file_weed_pb_mq_schema_proto_rawDescData } -var file_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 12) -var file_mq_schema_proto_goTypes = []any{ +var file_weed_pb_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_weed_pb_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_weed_pb_mq_schema_proto_goTypes = []any{ (OffsetType)(0), // 0: schema_pb.OffsetType (ScalarType)(0), // 1: schema_pb.ScalarType (*Topic)(nil), // 2: schema_pb.Topic @@ -1022,10 +1326,14 @@ var file_mq_schema_proto_goTypes = []any{ (*ListType)(nil), // 9: schema_pb.ListType (*RecordValue)(nil), // 10: schema_pb.RecordValue (*Value)(nil), // 11: schema_pb.Value - (*ListValue)(nil), // 12: schema_pb.ListValue - nil, // 13: schema_pb.RecordValue.FieldsEntry -} -var file_mq_schema_proto_depIdxs = []int32{ + (*TimestampValue)(nil), // 12: schema_pb.TimestampValue + (*DateValue)(nil), // 13: schema_pb.DateValue + (*DecimalValue)(nil), // 14: schema_pb.DecimalValue + (*TimeValue)(nil), // 15: schema_pb.TimeValue + (*ListValue)(nil), // 16: schema_pb.ListValue + nil, // 17: schema_pb.RecordValue.FieldsEntry +} +var file_weed_pb_mq_schema_proto_depIdxs = []int32{ 2, // 0: schema_pb.Offset.topic:type_name -> schema_pb.Topic 5, // 1: schema_pb.Offset.partition_offsets:type_name -> schema_pb.PartitionOffset 3, // 2: schema_pb.PartitionOffset.partition:type_name -> schema_pb.Partition @@ -1035,29 +1343,33 @@ var file_mq_schema_proto_depIdxs = []int32{ 6, // 6: schema_pb.Type.record_type:type_name -> schema_pb.RecordType 9, // 7: schema_pb.Type.list_type:type_name -> schema_pb.ListType 8, // 8: schema_pb.ListType.element_type:type_name -> schema_pb.Type - 13, // 9: schema_pb.RecordValue.fields:type_name -> schema_pb.RecordValue.FieldsEntry - 12, // 10: schema_pb.Value.list_value:type_name -> schema_pb.ListValue - 10, // 11: schema_pb.Value.record_value:type_name -> schema_pb.RecordValue - 11, // 12: schema_pb.ListValue.values:type_name -> schema_pb.Value - 11, // 13: schema_pb.RecordValue.FieldsEntry.value:type_name -> schema_pb.Value - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name -} - -func init() { file_mq_schema_proto_init() } -func file_mq_schema_proto_init() { - if File_mq_schema_proto != nil { + 17, // 9: schema_pb.RecordValue.fields:type_name -> schema_pb.RecordValue.FieldsEntry + 12, // 10: schema_pb.Value.timestamp_value:type_name -> schema_pb.TimestampValue + 13, // 11: schema_pb.Value.date_value:type_name -> schema_pb.DateValue + 14, // 12: schema_pb.Value.decimal_value:type_name -> schema_pb.DecimalValue + 15, // 13: schema_pb.Value.time_value:type_name -> schema_pb.TimeValue + 16, // 14: schema_pb.Value.list_value:type_name -> schema_pb.ListValue + 10, // 15: schema_pb.Value.record_value:type_name -> schema_pb.RecordValue + 11, // 16: schema_pb.ListValue.values:type_name -> schema_pb.Value + 11, // 17: schema_pb.RecordValue.FieldsEntry.value:type_name -> schema_pb.Value + 18, // [18:18] is the sub-list for method output_type + 18, // [18:18] is the sub-list for method input_type + 18, // [18:18] is the sub-list for extension type_name + 18, // [18:18] is the sub-list for extension extendee + 0, // [0:18] is the sub-list for field type_name +} + +func init() { file_weed_pb_mq_schema_proto_init() } +func file_weed_pb_mq_schema_proto_init() { + if File_weed_pb_mq_schema_proto != nil { return } - file_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ + file_weed_pb_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ (*Type_ScalarType)(nil), (*Type_RecordType)(nil), (*Type_ListType)(nil), } - file_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ + file_weed_pb_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ (*Value_BoolValue)(nil), (*Value_Int32Value)(nil), (*Value_Int64Value)(nil), @@ -1065,6 +1377,10 @@ func file_mq_schema_proto_init() { (*Value_DoubleValue)(nil), (*Value_BytesValue)(nil), (*Value_StringValue)(nil), + (*Value_TimestampValue)(nil), + (*Value_DateValue)(nil), + (*Value_DecimalValue)(nil), + (*Value_TimeValue)(nil), (*Value_ListValue)(nil), (*Value_RecordValue)(nil), } @@ -1072,18 +1388,18 @@ func file_mq_schema_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc)), NumEnums: 2, - NumMessages: 12, + NumMessages: 16, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_mq_schema_proto_goTypes, - DependencyIndexes: file_mq_schema_proto_depIdxs, - EnumInfos: file_mq_schema_proto_enumTypes, - MessageInfos: file_mq_schema_proto_msgTypes, + GoTypes: file_weed_pb_mq_schema_proto_goTypes, + DependencyIndexes: file_weed_pb_mq_schema_proto_depIdxs, + EnumInfos: file_weed_pb_mq_schema_proto_enumTypes, + MessageInfos: file_weed_pb_mq_schema_proto_msgTypes, }.Build() - File_mq_schema_proto = out.File - file_mq_schema_proto_goTypes = nil - file_mq_schema_proto_depIdxs = nil + File_weed_pb_mq_schema_proto = out.File + file_weed_pb_mq_schema_proto_goTypes = nil + file_weed_pb_mq_schema_proto_depIdxs = nil } diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 53aadbcaf..1609dac87 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -255,13 +255,13 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { } sqlUpper := strings.ToUpper(sql) - + // Find SELECT clause selectIdx := strings.Index(sqlUpper, "SELECT") if selectIdx == -1 { return nil, fmt.Errorf("SELECT keyword not found") } - + // Find FROM clause fromIdx := strings.Index(sqlUpper, "FROM") var selectClause string @@ -270,7 +270,7 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { } else { selectClause = sql[selectIdx+6:] // No FROM clause } - + // Parse SELECT expressions selectClause = strings.TrimSpace(selectClause) if selectClause == "*" { @@ -302,7 +302,7 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { // Parse FROM clause if fromIdx != -1 { remaining := sql[fromIdx+4:] // Skip "FROM" - + // Find WHERE clause whereIdx := strings.Index(strings.ToUpper(remaining), "WHERE") var fromClause string @@ -317,7 +317,7 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { fromClause = remaining } } - + fromClause = strings.TrimSpace(fromClause) tableName := TableName{ Name: stringValue(fromClause), @@ -328,13 +328,13 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { // Parse WHERE clause if whereIdx != -1 { whereClause := remaining[whereIdx+5:] // Skip "WHERE" - + // Find LIMIT clause limitIdx := strings.Index(strings.ToUpper(whereClause), "LIMIT") if limitIdx != -1 { whereClause = whereClause[:limitIdx] } - + whereClause = strings.TrimSpace(whereClause) if whereClause != "" { whereExpr, err := parseSimpleWhereExpression(whereClause) @@ -344,13 +344,13 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { s.Where = &WhereClause{Expr: whereExpr} } } - + // Parse LIMIT clause limitIdx := strings.Index(strings.ToUpper(remaining), "LIMIT") if limitIdx != -1 { limitClause := remaining[limitIdx+5:] // Skip "LIMIT" limitClause = strings.TrimSpace(limitClause) - + if _, err := strconv.Atoi(limitClause); err == nil { s.Limit = &LimitClause{ Rowcount: &SQLVal{ @@ -377,7 +377,7 @@ func extractFunctionName(expr string) string { // parseSimpleWhereExpression parses a simple WHERE expression func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { whereClause = strings.TrimSpace(whereClause) - + // Handle AND/OR expressions first (higher precedence) if strings.Contains(strings.ToUpper(whereClause), " AND ") { // Use original case for parsing but ToUpper for detection @@ -397,7 +397,7 @@ func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { return &AndExpr{Left: left, Right: right}, nil } } - + if strings.Contains(strings.ToUpper(whereClause), " OR ") { // Use original case for parsing but ToUpper for detection originalParts := strings.SplitN(whereClause, " OR ", 2) @@ -416,18 +416,18 @@ func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { return &OrExpr{Left: left, Right: right}, nil } } - + // Handle simple comparison operations operators := []string{">=", "<=", "!=", "<>", "=", ">", "<"} - + for _, op := range operators { if idx := strings.Index(whereClause, op); idx != -1 { left := strings.TrimSpace(whereClause[:idx]) right := strings.TrimSpace(whereClause[idx+len(op):]) - + // Parse left side (should be a column name) leftExpr := &ColName{Name: stringValue(left)} - + // Parse right side (should be a value) var rightExpr ExprNode if strings.HasPrefix(right, "'") && strings.HasSuffix(right, "'") { @@ -452,7 +452,7 @@ func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { // Assume it's a column name rightExpr = &ColName{Name: stringValue(right)} } - + // Convert operator to internal representation var operator string switch op { @@ -471,7 +471,7 @@ func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { default: operator = op } - + return &ComparisonExpr{ Left: leftExpr, Right: rightExpr, @@ -479,7 +479,7 @@ func parseSimpleWhereExpression(whereClause string) (ExprNode, error) { }, nil } } - + return nil, fmt.Errorf("unsupported WHERE expression: %s", whereClause) }