diff --git a/SMQ_OFFSET_DEVELOPMENT_PLAN.md b/SMQ_OFFSET_DEVELOPMENT_PLAN.md new file mode 100644 index 000000000..2ff817521 --- /dev/null +++ b/SMQ_OFFSET_DEVELOPMENT_PLAN.md @@ -0,0 +1,287 @@ +# SMQ Native Offset Development Plan + +## Overview + +Add native per-partition sequential offsets to SeaweedMQ to eliminate the need for external offset mapping and provide better interoperability with message queue protocols. + +## Architecture Changes + +### Data Model +- Add `offset` field (int64) to each record alongside existing `ts_ns` +- Offset domain: per `schema_pb.Partition` (ring range) +- Offsets are strictly monotonic within a partition +- Leader assigns offsets; followers replicate + +### Storage +- Use `_index` as hidden SQL table column for offset storage +- Maintain per-partition offset counters in broker state +- Checkpoint offset state periodically for recovery + +## Development Phases + +### Phase 1: Proto and Data Model Changes + +**Scope**: Update protobuf definitions and core data structures + +**Tasks**: +1. Update `mq_schema.proto`: + - Add `offset` field to record storage format + - Add offset-based `OffsetType` enums + - Add `offset_value` field to subscription requests +2. Update `mq_agent.proto`: + - Add `base_offset` and `last_offset` to `PublishRecordResponse` + - Add `offset` field to `SubscribeRecordResponse` +3. Regenerate protobuf Go code +4. Update core data structures in broker code +5. Add offset field to SQL schema with `_index` column + +**Tests**: +- Proto compilation tests +- Data structure serialization tests +- SQL schema migration tests + +**Deliverables**: +- Updated proto files +- Generated Go code +- Updated SQL schema +- Basic unit tests + +### Phase 2: Offset Assignment Logic + +**Scope**: Implement offset assignment in broker + +**Tasks**: +1. Add `PartitionOffsetManager` component: + - Track `next_offset` per partition + - Assign sequential offsets to records + - Handle offset recovery on startup +2. Integrate with existing record publishing flow: + - Assign offsets before storage + - Update `PublishRecordResponse` with offset info +3. Add offset persistence to storage layer: + - Store offset alongside record data + - Index by offset for efficient lookups +4. Implement offset recovery: + - Load highest offset on partition leadership + - Handle clean and unclean restarts + +**Tests**: +- Offset assignment unit tests +- Offset persistence tests +- Recovery scenario tests +- Concurrent assignment tests + +**Deliverables**: +- `PartitionOffsetManager` implementation +- Integrated publishing with offsets +- Offset recovery logic +- Comprehensive test suite + +### Phase 3: Subscription by Offset + +**Scope**: Enable consumers to subscribe using offsets + +**Tasks**: +1. Extend subscription logic: + - Support `EXACT_OFFSET` and `RESET_TO_OFFSET` modes + - Add offset-based seeking + - Maintain backward compatibility with timestamp-based seeks +2. Update `SubscribeRecordResponse`: + - Include offset in response messages + - Ensure offset ordering in delivery +3. Add offset validation: + - Validate requested offsets are within valid range + - Handle out-of-range offset requests gracefully +4. Implement offset-based filtering and pagination + +**Tests**: +- Offset-based subscription tests +- Seek functionality tests +- Out-of-range offset handling tests +- Mixed timestamp/offset subscription tests + +**Deliverables**: +- Offset-based subscription implementation +- Updated subscription APIs +- Validation and error handling +- Integration tests + +### Phase 4: High Water Mark and Lag Calculation + +**Scope**: Implement native offset-based metrics + +**Tasks**: +1. Add high water mark tracking: + - Track highest committed offset per partition + - Expose via broker APIs + - Update on successful replication +2. Implement lag calculation: + - Consumer lag = high_water_mark - consumer_offset + - Partition lag metrics + - Consumer group lag aggregation +3. Add offset-based monitoring: + - Partition offset metrics + - Consumer position tracking + - Lag alerting capabilities +4. Update existing monitoring integration + +**Tests**: +- High water mark calculation tests +- Lag computation tests +- Monitoring integration tests +- Metrics accuracy tests + +**Deliverables**: +- High water mark implementation +- Lag calculation logic +- Monitoring integration +- Metrics and alerting + +### Phase 5: Kafka Gateway Integration + +**Scope**: Update Kafka gateway to use native SMQ offsets + +**Tasks**: +1. Remove offset mapping layer: + - Delete `kafka-system/offset-mappings` topic usage + - Remove `PersistentLedger` and `SeaweedMQStorage` + - Simplify offset translation logic +2. Update Kafka protocol handlers: + - Use native SMQ offsets in Produce responses + - Map SMQ offsets directly to Kafka offsets + - Update ListOffsets and Fetch handlers +3. Simplify consumer group offset management: + - Store Kafka consumer offsets as SMQ offsets + - Remove timestamp-based offset translation +4. Update integration tests: + - Test Kafka client compatibility + - Verify offset consistency + - Test long-term disconnection scenarios + +**Tests**: +- Kafka protocol compatibility tests +- End-to-end integration tests +- Performance comparison tests +- Migration scenario tests + +**Deliverables**: +- Simplified Kafka gateway +- Removed offset mapping complexity +- Updated integration tests +- Performance improvements + +### Phase 6: Performance Optimization and Production Readiness + +**Scope**: Optimize performance and prepare for production + +**Tasks**: +1. Optimize offset assignment performance: + - Batch offset assignment + - Reduce lock contention + - Optimize recovery performance +2. Add offset compaction and cleanup: + - Implement offset-based log compaction + - Add retention policies based on offsets + - Cleanup old offset checkpoints +3. Enhance monitoring and observability: + - Detailed offset metrics + - Performance dashboards + - Alerting on offset anomalies +4. Load testing and benchmarking: + - Compare performance with timestamp-only approach + - Test under high load scenarios + - Validate memory usage patterns + +**Tests**: +- Performance benchmarks +- Load testing scenarios +- Memory usage tests +- Stress testing under failures + +**Deliverables**: +- Optimized offset implementation +- Production monitoring +- Performance benchmarks +- Production deployment guide + +## Implementation Guidelines + +### Code Organization +``` +weed/mq/ +├── offset/ +│ ├── manager.go # PartitionOffsetManager +│ ├── recovery.go # Offset recovery logic +│ └── checkpoint.go # Offset checkpointing +├── broker/ +│ ├── partition_leader.go # Updated with offset assignment +│ └── subscriber.go # Updated with offset support +└── storage/ + └── offset_store.go # Offset persistence layer +``` + +### Testing Strategy +- Unit tests for each component +- Integration tests for cross-component interactions +- Performance tests for offset assignment and recovery +- Compatibility tests with existing SMQ features +- End-to-end tests with Kafka gateway + +### Commit Strategy +- One commit per completed task within a phase +- All tests must pass before commit +- No binary files in commits +- Clear commit messages describing changes + +### Rollout Plan +1. Deploy to development environment after Phase 2 +2. Integration testing after Phase 3 +3. Performance testing after Phase 4 +4. Kafka gateway migration after Phase 5 +5. Production rollout after Phase 6 + +## Success Criteria + +### Phase Completion Criteria +- All tests pass +- Code review completed +- Documentation updated +- Performance benchmarks meet targets + +### Overall Success Metrics +- Eliminate external offset mapping complexity +- Maintain or improve performance +- Full Kafka protocol compatibility +- Native SMQ offset support for all protocols +- Simplified consumer group offset management + +## Risk Mitigation + +### Technical Risks +- **Offset assignment bottlenecks**: Implement batching and optimize locking +- **Recovery performance**: Use checkpointing and incremental recovery +- **Storage overhead**: Optimize offset storage and indexing + +### Operational Risks +- **Migration complexity**: Implement gradual rollout with rollback capability +- **Data consistency**: Extensive testing of offset assignment and recovery +- **Performance regression**: Continuous benchmarking and monitoring + +## Timeline Estimate + +- Phase 1: 1-2 weeks +- Phase 2: 2-3 weeks +- Phase 3: 2-3 weeks +- Phase 4: 1-2 weeks +- Phase 5: 2-3 weeks +- Phase 6: 2-3 weeks + +**Total: 10-16 weeks** + +## Next Steps + +1. Review and approve development plan +2. Set up development branch +3. Begin Phase 1 implementation +4. Establish testing and CI pipeline +5. Regular progress reviews and adjustments diff --git a/weed/pb/mq_agent.proto b/weed/pb/mq_agent.proto index 91f5a4cfc..6457cbcd8 100644 --- a/weed/pb/mq_agent.proto +++ b/weed/pb/mq_agent.proto @@ -53,6 +53,8 @@ message PublishRecordRequest { message PublishRecordResponse { int64 ack_sequence = 1; string error = 2; + int64 base_offset = 3; // First offset assigned to this batch + int64 last_offset = 4; // Last offset assigned to this batch } ////////////////////////////////////////////////// message SubscribeRecordRequest { @@ -78,5 +80,6 @@ message SubscribeRecordResponse { string error = 5; bool is_end_of_stream = 6; bool is_end_of_topic = 7; + int64 offset = 8; // Sequential offset within partition } ////////////////////////////////////////////////// diff --git a/weed/pb/mq_agent_pb/mq_agent.pb.go b/weed/pb/mq_agent_pb/mq_agent.pb.go index 11f1ac551..bc321e957 100644 --- a/weed/pb/mq_agent_pb/mq_agent.pb.go +++ b/weed/pb/mq_agent_pb/mq_agent.pb.go @@ -296,6 +296,8 @@ type PublishRecordResponse struct { state protoimpl.MessageState `protogen:"open.v1"` AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + BaseOffset int64 `protobuf:"varint,3,opt,name=base_offset,json=baseOffset,proto3" json:"base_offset,omitempty"` // First offset assigned to this batch + LastOffset int64 `protobuf:"varint,4,opt,name=last_offset,json=lastOffset,proto3" json:"last_offset,omitempty"` // Last offset assigned to this batch unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -344,6 +346,20 @@ func (x *PublishRecordResponse) GetError() string { return "" } +func (x *PublishRecordResponse) GetBaseOffset() int64 { + if x != nil { + return x.BaseOffset + } + return 0 +} + +func (x *PublishRecordResponse) GetLastOffset() int64 { + if x != nil { + return x.LastOffset + } + return 0 +} + // //////////////////////////////////////////////// type SubscribeRecordRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -413,6 +429,7 @@ type SubscribeRecordResponse struct { Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` IsEndOfStream bool `protobuf:"varint,6,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"` IsEndOfTopic bool `protobuf:"varint,7,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"` + Offset int64 `protobuf:"varint,8,opt,name=offset,proto3" json:"offset,omitempty"` // Sequential offset within partition unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -489,6 +506,13 @@ func (x *SubscribeRecordResponse) GetIsEndOfTopic() bool { return false } +func (x *SubscribeRecordResponse) GetOffset() int64 { + if x != nil { + return x.Offset + } + return 0 +} + type SubscribeRecordRequest_InitSubscribeRecordRequest struct { state protoimpl.MessageState `protogen:"open.v1"` ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` @@ -621,10 +645,14 @@ const file_mq_agent_proto_rawDesc = "" + "\n" + "session_id\x18\x01 \x01(\x03R\tsessionId\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03key\x12,\n" + - "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"P\n" + + "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"\x92\x01\n" + "\x15PublishRecordResponse\x12!\n" + "\fack_sequence\x18\x01 \x01(\x03R\vackSequence\x12\x14\n" + - "\x05error\x18\x02 \x01(\tR\x05error\"\xfb\x04\n" + + "\x05error\x18\x02 \x01(\tR\x05error\x12\x1f\n" + + "\vbase_offset\x18\x03 \x01(\x03R\n" + + "baseOffset\x12\x1f\n" + + "\vlast_offset\x18\x04 \x01(\x03R\n" + + "lastOffset\"\xfb\x04\n" + "\x16SubscribeRecordRequest\x12S\n" + "\x04init\x18\x01 \x01(\v2?.messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequestR\x04init\x12!\n" + "\fack_sequence\x18\x02 \x01(\x03R\vackSequence\x12\x17\n" + @@ -641,14 +669,15 @@ const file_mq_agent_proto_rawDesc = "" + "\x06filter\x18\n" + " \x01(\tR\x06filter\x12:\n" + "\x19max_subscribed_partitions\x18\v \x01(\x05R\x17maxSubscribedPartitions\x12.\n" + - "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xd4\x01\n" + + "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xec\x01\n" + "\x17SubscribeRecordResponse\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03key\x12,\n" + "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\x12\x13\n" + "\x05ts_ns\x18\x04 \x01(\x03R\x04tsNs\x12\x14\n" + "\x05error\x18\x05 \x01(\tR\x05error\x12'\n" + "\x10is_end_of_stream\x18\x06 \x01(\bR\risEndOfStream\x12%\n" + - "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic2\xb9\x03\n" + + "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic\x12\x16\n" + + "\x06offset\x18\b \x01(\x03R\x06offset2\xb9\x03\n" + "\x15SeaweedMessagingAgent\x12l\n" + "\x13StartPublishSession\x12(.messaging_pb.StartPublishSessionRequest\x1a).messaging_pb.StartPublishSessionResponse\"\x00\x12l\n" + "\x13ClosePublishSession\x12(.messaging_pb.ClosePublishSessionRequest\x1a).messaging_pb.ClosePublishSessionResponse\"\x00\x12^\n" + diff --git a/weed/pb/mq_agent_pb/publish_response_test.go b/weed/pb/mq_agent_pb/publish_response_test.go new file mode 100644 index 000000000..1f2e767e4 --- /dev/null +++ b/weed/pb/mq_agent_pb/publish_response_test.go @@ -0,0 +1,102 @@ +package mq_agent_pb + +import ( + "testing" + "google.golang.org/protobuf/proto" +) + +func TestPublishRecordResponseSerialization(t *testing.T) { + // Test that PublishRecordResponse can serialize/deserialize with new offset fields + original := &PublishRecordResponse{ + AckSequence: 123, + Error: "", + BaseOffset: 1000, // New field + LastOffset: 1005, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PublishRecordResponse: %v", err) + } + + restored := &PublishRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err) + } + + // Verify all fields are preserved + if restored.AckSequence != original.AckSequence { + t.Errorf("AckSequence = %d, want %d", restored.AckSequence, original.AckSequence) + } + if restored.BaseOffset != original.BaseOffset { + t.Errorf("BaseOffset = %d, want %d", restored.BaseOffset, original.BaseOffset) + } + if restored.LastOffset != original.LastOffset { + t.Errorf("LastOffset = %d, want %d", restored.LastOffset, original.LastOffset) + } +} + +func TestSubscribeRecordResponseSerialization(t *testing.T) { + // Test that SubscribeRecordResponse can serialize/deserialize with new offset field + original := &SubscribeRecordResponse{ + Key: []byte("test-key"), + TsNs: 1234567890, + Error: "", + IsEndOfStream: false, + IsEndOfTopic: false, + Offset: 42, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal SubscribeRecordResponse: %v", err) + } + + restored := &SubscribeRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal SubscribeRecordResponse: %v", err) + } + + // Verify all fields are preserved + if restored.TsNs != original.TsNs { + t.Errorf("TsNs = %d, want %d", restored.TsNs, original.TsNs) + } + if restored.Offset != original.Offset { + t.Errorf("Offset = %d, want %d", restored.Offset, original.Offset) + } + if string(restored.Key) != string(original.Key) { + t.Errorf("Key = %s, want %s", string(restored.Key), string(original.Key)) + } +} + +func TestPublishRecordResponseBackwardCompatibility(t *testing.T) { + // Test that PublishRecordResponse without offset fields still works + original := &PublishRecordResponse{ + AckSequence: 123, + Error: "", + // BaseOffset and LastOffset not set (defaults to 0) + } + + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PublishRecordResponse: %v", err) + } + + restored := &PublishRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err) + } + + // Offset fields should default to 0 + if restored.BaseOffset != 0 { + t.Errorf("BaseOffset = %d, want 0", restored.BaseOffset) + } + if restored.LastOffset != 0 { + t.Errorf("LastOffset = %d, want 0", restored.LastOffset) + } +} diff --git a/weed/pb/mq_schema.proto b/weed/pb/mq_schema.proto index 2deeadb55..81b523bcd 100644 --- a/weed/pb/mq_schema.proto +++ b/weed/pb/mq_schema.proto @@ -30,11 +30,15 @@ enum OffsetType { EXACT_TS_NS = 10; RESET_TO_LATEST = 15; RESUME_OR_LATEST = 20; + // Offset-based positioning + EXACT_OFFSET = 25; + RESET_TO_OFFSET = 30; } message PartitionOffset { Partition partition = 1; int64 start_ts_ns = 2; + int64 start_offset = 3; // For offset-based positioning } /////////////////////////// diff --git a/weed/pb/schema_pb/mq_schema.pb.go b/weed/pb/schema_pb/mq_schema.pb.go index 2cd2118bf..7fbf4a4e6 100644 --- a/weed/pb/schema_pb/mq_schema.pb.go +++ b/weed/pb/schema_pb/mq_schema.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.6 // protoc v5.29.3 -// source: weed/pb/mq_schema.proto +// source: mq_schema.proto package schema_pb @@ -29,6 +29,9 @@ const ( OffsetType_EXACT_TS_NS OffsetType = 10 OffsetType_RESET_TO_LATEST OffsetType = 15 OffsetType_RESUME_OR_LATEST OffsetType = 20 + // Offset-based positioning + OffsetType_EXACT_OFFSET OffsetType = 25 + OffsetType_RESET_TO_OFFSET OffsetType = 30 ) // Enum value maps for OffsetType. @@ -39,6 +42,8 @@ var ( 10: "EXACT_TS_NS", 15: "RESET_TO_LATEST", 20: "RESUME_OR_LATEST", + 25: "EXACT_OFFSET", + 30: "RESET_TO_OFFSET", } OffsetType_value = map[string]int32{ "RESUME_OR_EARLIEST": 0, @@ -46,6 +51,8 @@ var ( "EXACT_TS_NS": 10, "RESET_TO_LATEST": 15, "RESUME_OR_LATEST": 20, + "EXACT_OFFSET": 25, + "RESET_TO_OFFSET": 30, } ) @@ -60,11 +67,11 @@ func (x OffsetType) String() string { } func (OffsetType) Descriptor() protoreflect.EnumDescriptor { - return file_weed_pb_mq_schema_proto_enumTypes[0].Descriptor() + return file_mq_schema_proto_enumTypes[0].Descriptor() } func (OffsetType) Type() protoreflect.EnumType { - return &file_weed_pb_mq_schema_proto_enumTypes[0] + return &file_mq_schema_proto_enumTypes[0] } func (x OffsetType) Number() protoreflect.EnumNumber { @@ -73,7 +80,7 @@ func (x OffsetType) Number() protoreflect.EnumNumber { // Deprecated: Use OffsetType.Descriptor instead. func (OffsetType) EnumDescriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} + return file_mq_schema_proto_rawDescGZIP(), []int{0} } type ScalarType int32 @@ -134,11 +141,11 @@ func (x ScalarType) String() string { } func (ScalarType) Descriptor() protoreflect.EnumDescriptor { - return file_weed_pb_mq_schema_proto_enumTypes[1].Descriptor() + return file_mq_schema_proto_enumTypes[1].Descriptor() } func (ScalarType) Type() protoreflect.EnumType { - return &file_weed_pb_mq_schema_proto_enumTypes[1] + return &file_mq_schema_proto_enumTypes[1] } func (x ScalarType) Number() protoreflect.EnumNumber { @@ -147,7 +154,7 @@ func (x ScalarType) Number() protoreflect.EnumNumber { // Deprecated: Use ScalarType.Descriptor instead. func (ScalarType) EnumDescriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} + return file_mq_schema_proto_rawDescGZIP(), []int{1} } type Topic struct { @@ -160,7 +167,7 @@ type Topic struct { func (x *Topic) Reset() { *x = Topic{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[0] + mi := &file_mq_schema_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -172,7 +179,7 @@ func (x *Topic) String() string { func (*Topic) ProtoMessage() {} func (x *Topic) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[0] + mi := &file_mq_schema_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -185,7 +192,7 @@ func (x *Topic) ProtoReflect() protoreflect.Message { // Deprecated: Use Topic.ProtoReflect.Descriptor instead. func (*Topic) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} + return file_mq_schema_proto_rawDescGZIP(), []int{0} } func (x *Topic) GetNamespace() string { @@ -214,7 +221,7 @@ type Partition struct { func (x *Partition) Reset() { *x = Partition{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[1] + mi := &file_mq_schema_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -226,7 +233,7 @@ func (x *Partition) String() string { func (*Partition) ProtoMessage() {} func (x *Partition) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[1] + mi := &file_mq_schema_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -239,7 +246,7 @@ func (x *Partition) ProtoReflect() protoreflect.Message { // Deprecated: Use Partition.ProtoReflect.Descriptor instead. func (*Partition) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} + return file_mq_schema_proto_rawDescGZIP(), []int{1} } func (x *Partition) GetRingSize() int32 { @@ -280,7 +287,7 @@ type Offset struct { func (x *Offset) Reset() { *x = Offset{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[2] + mi := &file_mq_schema_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -292,7 +299,7 @@ func (x *Offset) String() string { func (*Offset) ProtoMessage() {} func (x *Offset) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[2] + mi := &file_mq_schema_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -305,7 +312,7 @@ func (x *Offset) ProtoReflect() protoreflect.Message { // Deprecated: Use Offset.ProtoReflect.Descriptor instead. func (*Offset) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{2} + return file_mq_schema_proto_rawDescGZIP(), []int{2} } func (x *Offset) GetTopic() *Topic { @@ -326,13 +333,14 @@ type PartitionOffset struct { state protoimpl.MessageState `protogen:"open.v1"` Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` StartTsNs int64 `protobuf:"varint,2,opt,name=start_ts_ns,json=startTsNs,proto3" json:"start_ts_ns,omitempty"` + StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3" json:"start_offset,omitempty"` // For offset-based positioning unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *PartitionOffset) Reset() { *x = PartitionOffset{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[3] + mi := &file_mq_schema_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -344,7 +352,7 @@ func (x *PartitionOffset) String() string { func (*PartitionOffset) ProtoMessage() {} func (x *PartitionOffset) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[3] + mi := &file_mq_schema_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -357,7 +365,7 @@ func (x *PartitionOffset) ProtoReflect() protoreflect.Message { // Deprecated: Use PartitionOffset.ProtoReflect.Descriptor instead. func (*PartitionOffset) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{3} + return file_mq_schema_proto_rawDescGZIP(), []int{3} } func (x *PartitionOffset) GetPartition() *Partition { @@ -374,6 +382,13 @@ func (x *PartitionOffset) GetStartTsNs() int64 { return 0 } +func (x *PartitionOffset) GetStartOffset() int64 { + if x != nil { + return x.StartOffset + } + return 0 +} + type RecordType struct { state protoimpl.MessageState `protogen:"open.v1"` Fields []*Field `protobuf:"bytes,1,rep,name=fields,proto3" json:"fields,omitempty"` @@ -383,7 +398,7 @@ type RecordType struct { func (x *RecordType) Reset() { *x = RecordType{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[4] + mi := &file_mq_schema_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -395,7 +410,7 @@ func (x *RecordType) String() string { func (*RecordType) ProtoMessage() {} func (x *RecordType) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[4] + mi := &file_mq_schema_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -408,7 +423,7 @@ func (x *RecordType) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordType.ProtoReflect.Descriptor instead. func (*RecordType) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{4} + return file_mq_schema_proto_rawDescGZIP(), []int{4} } func (x *RecordType) GetFields() []*Field { @@ -431,7 +446,7 @@ type Field struct { func (x *Field) Reset() { *x = Field{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[5] + mi := &file_mq_schema_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -443,7 +458,7 @@ func (x *Field) String() string { func (*Field) ProtoMessage() {} func (x *Field) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[5] + mi := &file_mq_schema_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -456,7 +471,7 @@ func (x *Field) ProtoReflect() protoreflect.Message { // Deprecated: Use Field.ProtoReflect.Descriptor instead. func (*Field) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{5} + return file_mq_schema_proto_rawDescGZIP(), []int{5} } func (x *Field) GetName() string { @@ -508,7 +523,7 @@ type Type struct { func (x *Type) Reset() { *x = Type{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[6] + mi := &file_mq_schema_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -520,7 +535,7 @@ func (x *Type) String() string { func (*Type) ProtoMessage() {} func (x *Type) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[6] + mi := &file_mq_schema_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -533,7 +548,7 @@ func (x *Type) ProtoReflect() protoreflect.Message { // Deprecated: Use Type.ProtoReflect.Descriptor instead. func (*Type) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{6} + return file_mq_schema_proto_rawDescGZIP(), []int{6} } func (x *Type) GetKind() isType_Kind { @@ -601,7 +616,7 @@ type ListType struct { func (x *ListType) Reset() { *x = ListType{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[7] + mi := &file_mq_schema_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -613,7 +628,7 @@ func (x *ListType) String() string { func (*ListType) ProtoMessage() {} func (x *ListType) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[7] + mi := &file_mq_schema_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -626,7 +641,7 @@ func (x *ListType) ProtoReflect() protoreflect.Message { // Deprecated: Use ListType.ProtoReflect.Descriptor instead. func (*ListType) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{7} + return file_mq_schema_proto_rawDescGZIP(), []int{7} } func (x *ListType) GetElementType() *Type { @@ -648,7 +663,7 @@ type RecordValue struct { func (x *RecordValue) Reset() { *x = RecordValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[8] + mi := &file_mq_schema_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -660,7 +675,7 @@ func (x *RecordValue) String() string { func (*RecordValue) ProtoMessage() {} func (x *RecordValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[8] + mi := &file_mq_schema_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -673,7 +688,7 @@ func (x *RecordValue) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordValue.ProtoReflect.Descriptor instead. func (*RecordValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{8} + return file_mq_schema_proto_rawDescGZIP(), []int{8} } func (x *RecordValue) GetFields() map[string]*Value { @@ -707,7 +722,7 @@ type Value struct { func (x *Value) Reset() { *x = Value{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[9] + mi := &file_mq_schema_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -719,7 +734,7 @@ func (x *Value) String() string { func (*Value) ProtoMessage() {} func (x *Value) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[9] + mi := &file_mq_schema_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -732,7 +747,7 @@ func (x *Value) ProtoReflect() protoreflect.Message { // Deprecated: Use Value.ProtoReflect.Descriptor instead. func (*Value) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{9} + return file_mq_schema_proto_rawDescGZIP(), []int{9} } func (x *Value) GetKind() isValue_Kind { @@ -954,7 +969,7 @@ type TimestampValue struct { func (x *TimestampValue) Reset() { *x = TimestampValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + mi := &file_mq_schema_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -966,7 +981,7 @@ func (x *TimestampValue) String() string { func (*TimestampValue) ProtoMessage() {} func (x *TimestampValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + mi := &file_mq_schema_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -979,7 +994,7 @@ func (x *TimestampValue) ProtoReflect() protoreflect.Message { // Deprecated: Use TimestampValue.ProtoReflect.Descriptor instead. func (*TimestampValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{10} + return file_mq_schema_proto_rawDescGZIP(), []int{10} } func (x *TimestampValue) GetTimestampMicros() int64 { @@ -1005,7 +1020,7 @@ type DateValue struct { func (x *DateValue) Reset() { *x = DateValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + mi := &file_mq_schema_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1017,7 +1032,7 @@ func (x *DateValue) String() string { func (*DateValue) ProtoMessage() {} func (x *DateValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + mi := &file_mq_schema_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1030,7 +1045,7 @@ func (x *DateValue) ProtoReflect() protoreflect.Message { // Deprecated: Use DateValue.ProtoReflect.Descriptor instead. func (*DateValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{11} + return file_mq_schema_proto_rawDescGZIP(), []int{11} } func (x *DateValue) GetDaysSinceEpoch() int32 { @@ -1051,7 +1066,7 @@ type DecimalValue struct { func (x *DecimalValue) Reset() { *x = DecimalValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + mi := &file_mq_schema_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1063,7 +1078,7 @@ func (x *DecimalValue) String() string { func (*DecimalValue) ProtoMessage() {} func (x *DecimalValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + mi := &file_mq_schema_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1076,7 +1091,7 @@ func (x *DecimalValue) ProtoReflect() protoreflect.Message { // Deprecated: Use DecimalValue.ProtoReflect.Descriptor instead. func (*DecimalValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{12} + return file_mq_schema_proto_rawDescGZIP(), []int{12} } func (x *DecimalValue) GetValue() []byte { @@ -1109,7 +1124,7 @@ type TimeValue struct { func (x *TimeValue) Reset() { *x = TimeValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + mi := &file_mq_schema_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1121,7 +1136,7 @@ func (x *TimeValue) String() string { func (*TimeValue) ProtoMessage() {} func (x *TimeValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + mi := &file_mq_schema_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1134,7 +1149,7 @@ func (x *TimeValue) ProtoReflect() protoreflect.Message { // Deprecated: Use TimeValue.ProtoReflect.Descriptor instead. func (*TimeValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{13} + return file_mq_schema_proto_rawDescGZIP(), []int{13} } func (x *TimeValue) GetTimeMicros() int64 { @@ -1153,7 +1168,7 @@ type ListValue struct { func (x *ListValue) Reset() { *x = ListValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[14] + mi := &file_mq_schema_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1165,7 +1180,7 @@ func (x *ListValue) String() string { func (*ListValue) ProtoMessage() {} func (x *ListValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[14] + mi := &file_mq_schema_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1178,7 +1193,7 @@ func (x *ListValue) ProtoReflect() protoreflect.Message { // Deprecated: Use ListValue.ProtoReflect.Descriptor instead. func (*ListValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{14} + return file_mq_schema_proto_rawDescGZIP(), []int{14} } func (x *ListValue) GetValues() []*Value { @@ -1188,11 +1203,11 @@ func (x *ListValue) GetValues() []*Value { return nil } -var File_weed_pb_mq_schema_proto protoreflect.FileDescriptor +var File_mq_schema_proto protoreflect.FileDescriptor -const file_weed_pb_mq_schema_proto_rawDesc = "" + +const file_mq_schema_proto_rawDesc = "" + "\n" + - "\x17weed/pb/mq_schema.proto\x12\tschema_pb\"9\n" + + "\x0fmq_schema.proto\x12\tschema_pb\"9\n" + "\x05Topic\x12\x1c\n" + "\tnamespace\x18\x01 \x01(\tR\tnamespace\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\"\x8a\x01\n" + @@ -1206,10 +1221,11 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "unixTimeNs\"y\n" + "\x06Offset\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12G\n" + - "\x11partition_offsets\x18\x02 \x03(\v2\x1a.schema_pb.PartitionOffsetR\x10partitionOffsets\"e\n" + + "\x11partition_offsets\x18\x02 \x03(\v2\x1a.schema_pb.PartitionOffsetR\x10partitionOffsets\"\x88\x01\n" + "\x0fPartitionOffset\x122\n" + "\tpartition\x18\x01 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12\x1e\n" + - "\vstart_ts_ns\x18\x02 \x01(\x03R\tstartTsNs\"6\n" + + "\vstart_ts_ns\x18\x02 \x01(\x03R\tstartTsNs\x12!\n" + + "\fstart_offset\x18\x03 \x01(\x03R\vstartOffset\"6\n" + "\n" + "RecordType\x12(\n" + "\x06fields\x18\x01 \x03(\v2\x10.schema_pb.FieldR\x06fields\"\xa3\x01\n" + @@ -1273,7 +1289,7 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\vtime_micros\x18\x01 \x01(\x03R\n" + "timeMicros\"5\n" + "\tListValue\x12(\n" + - "\x06values\x18\x01 \x03(\v2\x10.schema_pb.ValueR\x06values*w\n" + + "\x06values\x18\x01 \x03(\v2\x10.schema_pb.ValueR\x06values*\x9e\x01\n" + "\n" + "OffsetType\x12\x16\n" + "\x12RESUME_OR_EARLIEST\x10\x00\x12\x15\n" + @@ -1281,7 +1297,9 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\vEXACT_TS_NS\x10\n" + "\x12\x13\n" + "\x0fRESET_TO_LATEST\x10\x0f\x12\x14\n" + - "\x10RESUME_OR_LATEST\x10\x14*\x8a\x01\n" + + "\x10RESUME_OR_LATEST\x10\x14\x12\x10\n" + + "\fEXACT_OFFSET\x10\x19\x12\x13\n" + + "\x0fRESET_TO_OFFSET\x10\x1e*\x8a\x01\n" + "\n" + "ScalarType\x12\b\n" + "\x04BOOL\x10\x00\x12\t\n" + @@ -1300,20 +1318,20 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\x04TIME\x10\vB2Z0github.com/seaweedfs/seaweedfs/weed/pb/schema_pbb\x06proto3" var ( - file_weed_pb_mq_schema_proto_rawDescOnce sync.Once - file_weed_pb_mq_schema_proto_rawDescData []byte + file_mq_schema_proto_rawDescOnce sync.Once + file_mq_schema_proto_rawDescData []byte ) -func file_weed_pb_mq_schema_proto_rawDescGZIP() []byte { - file_weed_pb_mq_schema_proto_rawDescOnce.Do(func() { - file_weed_pb_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc))) +func file_mq_schema_proto_rawDescGZIP() []byte { + file_mq_schema_proto_rawDescOnce.Do(func() { + file_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc))) }) - return file_weed_pb_mq_schema_proto_rawDescData + return file_mq_schema_proto_rawDescData } -var file_weed_pb_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_weed_pb_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 16) -var file_weed_pb_mq_schema_proto_goTypes = []any{ +var file_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_mq_schema_proto_goTypes = []any{ (OffsetType)(0), // 0: schema_pb.OffsetType (ScalarType)(0), // 1: schema_pb.ScalarType (*Topic)(nil), // 2: schema_pb.Topic @@ -1333,7 +1351,7 @@ var file_weed_pb_mq_schema_proto_goTypes = []any{ (*ListValue)(nil), // 16: schema_pb.ListValue nil, // 17: schema_pb.RecordValue.FieldsEntry } -var file_weed_pb_mq_schema_proto_depIdxs = []int32{ +var file_mq_schema_proto_depIdxs = []int32{ 2, // 0: schema_pb.Offset.topic:type_name -> schema_pb.Topic 5, // 1: schema_pb.Offset.partition_offsets:type_name -> schema_pb.PartitionOffset 3, // 2: schema_pb.PartitionOffset.partition:type_name -> schema_pb.Partition @@ -1359,17 +1377,17 @@ var file_weed_pb_mq_schema_proto_depIdxs = []int32{ 0, // [0:18] is the sub-list for field type_name } -func init() { file_weed_pb_mq_schema_proto_init() } -func file_weed_pb_mq_schema_proto_init() { - if File_weed_pb_mq_schema_proto != nil { +func init() { file_mq_schema_proto_init() } +func file_mq_schema_proto_init() { + if File_mq_schema_proto != nil { return } - file_weed_pb_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ + file_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ (*Type_ScalarType)(nil), (*Type_RecordType)(nil), (*Type_ListType)(nil), } - file_weed_pb_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ + file_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ (*Value_BoolValue)(nil), (*Value_Int32Value)(nil), (*Value_Int64Value)(nil), @@ -1388,18 +1406,18 @@ func file_weed_pb_mq_schema_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc)), NumEnums: 2, NumMessages: 16, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_weed_pb_mq_schema_proto_goTypes, - DependencyIndexes: file_weed_pb_mq_schema_proto_depIdxs, - EnumInfos: file_weed_pb_mq_schema_proto_enumTypes, - MessageInfos: file_weed_pb_mq_schema_proto_msgTypes, + GoTypes: file_mq_schema_proto_goTypes, + DependencyIndexes: file_mq_schema_proto_depIdxs, + EnumInfos: file_mq_schema_proto_enumTypes, + MessageInfos: file_mq_schema_proto_msgTypes, }.Build() - File_weed_pb_mq_schema_proto = out.File - file_weed_pb_mq_schema_proto_goTypes = nil - file_weed_pb_mq_schema_proto_depIdxs = nil + File_mq_schema_proto = out.File + file_mq_schema_proto_goTypes = nil + file_mq_schema_proto_depIdxs = nil } diff --git a/weed/pb/schema_pb/offset_test.go b/weed/pb/schema_pb/offset_test.go new file mode 100644 index 000000000..28324836e --- /dev/null +++ b/weed/pb/schema_pb/offset_test.go @@ -0,0 +1,93 @@ +package schema_pb + +import ( + "testing" + "google.golang.org/protobuf/proto" +) + +func TestOffsetTypeEnums(t *testing.T) { + // Test that new offset-based enum values are defined + tests := []struct { + name string + value OffsetType + expected int32 + }{ + {"EXACT_OFFSET", OffsetType_EXACT_OFFSET, 25}, + {"RESET_TO_OFFSET", OffsetType_RESET_TO_OFFSET, 30}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if int32(tt.value) != tt.expected { + t.Errorf("OffsetType_%s = %d, want %d", tt.name, int32(tt.value), tt.expected) + } + }) + } +} + +func TestPartitionOffsetSerialization(t *testing.T) { + // Test that PartitionOffset can serialize/deserialize with new offset field + original := &PartitionOffset{ + Partition: &Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + }, + StartTsNs: 1234567890, + StartOffset: 42, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PartitionOffset: %v", err) + } + + restored := &PartitionOffset{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PartitionOffset: %v", err) + } + + // Verify all fields are preserved + if restored.StartTsNs != original.StartTsNs { + t.Errorf("StartTsNs = %d, want %d", restored.StartTsNs, original.StartTsNs) + } + if restored.StartOffset != original.StartOffset { + t.Errorf("StartOffset = %d, want %d", restored.StartOffset, original.StartOffset) + } + if restored.Partition.RingSize != original.Partition.RingSize { + t.Errorf("Partition.RingSize = %d, want %d", restored.Partition.RingSize, original.Partition.RingSize) + } +} + +func TestPartitionOffsetBackwardCompatibility(t *testing.T) { + // Test that PartitionOffset without StartOffset still works + original := &PartitionOffset{ + Partition: &Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + }, + StartTsNs: 1234567890, + // StartOffset not set (defaults to 0) + } + + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PartitionOffset: %v", err) + } + + restored := &PartitionOffset{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PartitionOffset: %v", err) + } + + // StartOffset should default to 0 + if restored.StartOffset != 0 { + t.Errorf("StartOffset = %d, want 0", restored.StartOffset) + } +}