diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index ee434d37b..db2964121 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -75,8 +75,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ Message: &mq_pb.SubscribeFollowMeRequest_Init{ Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{ - Topic: req.GetInit().Topic, - Partition: req.GetInit().GetPartitionOffset().Partition, + Topic: req.GetInit().Topic, + Partition: req.GetInit().GetPartitionOffset().Partition, ConsumerGroup: req.GetInit().ConsumerGroup, }, }, @@ -186,7 +186,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) return } - if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil{ + if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index 5a77ceebb..eb041dc60 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -12,7 +12,6 @@ import ( "time" ) - func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) { var req *mq_pb.SubscribeFollowMeRequest req, err = stream.Recv() @@ -72,7 +71,7 @@ func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.Subscrib err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName) if err != nil { - return err + return err } if len(data) != 8 { return fmt.Errorf("no offset found") diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index b3d86a401..640ccca10 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -47,7 +47,7 @@ type MessageQueueBroker struct { lockAsBalancer *cluster.LiveLock Coordinator *sub_coordinator.Coordinator accessLock sync.Mutex - fca *sub_coordinator.FilerClientAccessor + fca *sub_coordinator.FilerClientAccessor } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -65,7 +65,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial Coordinator: coordinator, } fca := &sub_coordinator.FilerClientAccessor{ - GetFiler: mqBroker.GetFiler, + GetFiler: mqBroker.GetFiler, GetGrpcDialOption: mqBroker.GetGrpcDialOption, } mqBroker.fca = fca @@ -130,7 +130,6 @@ func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption { return b.grpcDialOption } - func (b *MessageQueueBroker) GetFiler() pb.ServerAddress { return b.currentFiler } diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 4bbb26032..b18865877 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -44,7 +44,7 @@ func main() { subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (error) { + subscriber.SetEachMessageFunc(func(key, value []byte) error { counter++ println(string(key), "=>", string(value), counter) return nil diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index ed710fa57..7aa381c76 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -63,7 +63,7 @@ func main() { } processorConfig := sub_client.ProcessorConfiguration{ - MaxPartitionCount: 3, + MaxPartitionCount: 3, PerPartitionConcurrency: 1, } @@ -71,7 +71,7 @@ func main() { subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (error) { + subscriber.SetEachMessageFunc(func(key, value []byte) error { counter++ record := &schema_pb.RecordValue{} proto.Unmarshal(value, record) diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index ef7cfb93f..df2270b2c 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -142,11 +142,11 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: p.config.Topic.ToPbTopic(), - Partition: job.Partition, - AckInterval: 128, - FollowerBroker: job.FollowerBroker, - PublisherName: p.config.PublisherName, + Topic: p.config.Topic.ToPbTopic(), + Partition: job.Partition, + AckInterval: 128, + FollowerBroker: job.FollowerBroker, + PublisherName: p.config.PublisherName, }, }, }); err != nil { diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 815694a48..118d91bac 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), - MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount, + MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount, }, }, }); err != nil { @@ -105,12 +105,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig Partition: assigned.Partition, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, }, - Filter: sub.ContentConfig.Filter, + Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, - Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, + Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, }, }, - });err != nil { + }); err != nil { glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) } @@ -120,16 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig defer sub.OnCompletionFunc() } - partitionOffsetChan:= make(chan int64, 1024) + partitionOffsetChan := make(chan int64, 1024) defer func() { close(partitionOffsetChan) }() - concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount) - if concurrentPartitionLimit <= 0 { - concurrentPartitionLimit = 1 + perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency) + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 } - executors := util.NewLimitedConcurrentExecutor(concurrentPartitionLimit) + executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency) go func() { for ack := range partitionOffsetChan { @@ -162,7 +162,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) if processErr == nil { partitionOffsetChan <- m.Data.TsNs - }else{ + } else { lastErr = processErr } }) diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index ba20cf040..c519ca18b 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -9,7 +9,6 @@ import ( ) type ProcessorState struct { - } // Subscribe subscribes to a topic's specified partitions. diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 95320b19a..cf41f2881 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -22,7 +22,7 @@ type ContentConfiguration struct { } type ProcessorConfiguration struct { - MaxPartitionCount int32 // how many partitions to process concurrently + MaxPartitionCount int32 // how many partitions to process concurrently PerPartitionConcurrency int32 // how many messages to process concurrently per partition } @@ -30,16 +30,16 @@ type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { - SubscriberConfig *SubscriberConfiguration - ContentConfig *ContentConfiguration + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration ProcessorConfig *ProcessorConfiguration brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment OnEachMessageFunc OnEachMessageFunc - OnCompletionFunc OnCompletionFunc - bootstrapBrokers []string - waitForMoreMessage bool - activeProcessors map[topic.Partition]*ProcessorState - activeProcessorsLock sync.Mutex + OnCompletionFunc OnCompletionFunc + bootstrapBrokers []string + waitForMoreMessage bool + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 8e804c7fb..f6a13217e 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -12,8 +12,8 @@ import ( type ConsumerGroupInstance struct { InstanceId string // the consumer group instance may not have an active partition - Partitions []*topic.Partition - ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse + Partitions []*topic.Partition + ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse MaxPartitionCount int32 } type ConsumerGroup struct { @@ -43,10 +43,10 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { } } func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) { - cg.onConsumerGroupInstanceChange(true, "add consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds) + cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds) } func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) { - cg.onConsumerGroupInstanceChange(false, "remove consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds) + cg.onConsumerGroupInstanceChange(false, "remove consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds) } func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) { diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 4eafbca57..4bb726f26 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -17,8 +17,8 @@ type TopicConsumerGroups struct { type Coordinator struct { // map topic name to consumer groups - TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] - balancer *pub_balancer.Balancer + TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] + balancer *pub_balancer.Balancer FilerClientAccessor *FilerClientAccessor } diff --git a/weed/mq/sub_coordinator/filer_client_accessor.go b/weed/mq/sub_coordinator/filer_client_accessor.go index 85bb5e29d..dc50ac128 100644 --- a/weed/mq/sub_coordinator/filer_client_accessor.go +++ b/weed/mq/sub_coordinator/filer_client_accessor.go @@ -14,8 +14,8 @@ import ( ) type FilerClientAccessor struct { - GetFiler func() pb.ServerAddress - GetGrpcDialOption func()grpc.DialOption + GetFiler func() pb.ServerAddress + GetGrpcDialOption func() grpc.DialOption } func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index 5e13ac427..f48335435 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -6,15 +6,15 @@ import ( ) type InflightMessageTracker struct { - messages map[string]int64 - mu sync.Mutex - timestamps *RingBuffer + messages map[string]int64 + mu sync.Mutex + timestamps *RingBuffer } func NewInflightMessageTracker(capacity int) *InflightMessageTracker { return &InflightMessageTracker{ - messages: make(map[string]int64), - timestamps: NewRingBuffer(capacity), + messages: make(map[string]int64), + timestamps: NewRingBuffer(capacity), } } @@ -26,6 +26,7 @@ func (imt *InflightMessageTracker) InflightMessage(key []byte, tsNs int64) { imt.messages[string(key)] = tsNs imt.timestamps.Add(tsNs) } + // IsMessageAcknowledged returns true if the message has been acknowledged. // If the message is older than the oldest inflight messages, returns false. // returns false if the message is inflight. @@ -47,6 +48,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64) return true } + // AcknowledgeMessage acknowledges the message with the key and timestamp. func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool { imt.mu.Lock() @@ -71,12 +73,14 @@ type RingBuffer struct { head int size int } + // NewRingBuffer creates a new RingBuffer of the given capacity. func NewRingBuffer(capacity int) *RingBuffer { return &RingBuffer{ buffer: make([]int64, capacity), } } + // Add adds a new timestamp to the ring buffer. func (rb *RingBuffer) Add(timestamp int64) { rb.buffer[rb.head] = timestamp @@ -85,6 +89,7 @@ func (rb *RingBuffer) Add(timestamp int64) { rb.size++ } } + // Remove removes the specified timestamp from the ring buffer. func (rb *RingBuffer) Remove(timestamp int64) { // Perform binary search diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index 256d5e78c..23e0fb00f 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -82,10 +82,10 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions)) for _, partition := range partitions { newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{ - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - UnixTimeNs: partition.UnixTimeNs, - Broker: partition.AssignedBroker, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, + Broker: partition.AssignedBroker, FollowerBroker: partition.FollowerBroker, }) } diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go index 7dcfc6f9b..415eb27bd 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go @@ -32,7 +32,7 @@ func Test_doBalanceSticky(t *testing.T) { MaxPartitionCount: 1, }, }, - prevMapping: nil, + prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -61,7 +61,7 @@ func Test_doBalanceSticky(t *testing.T) { MaxPartitionCount: 1, }, }, - prevMapping: nil, + prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -90,7 +90,7 @@ func Test_doBalanceSticky(t *testing.T) { MaxPartitionCount: 1, }, }, - prevMapping: nil, + prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ { @@ -128,7 +128,7 @@ func Test_doBalanceSticky(t *testing.T) { MaxPartitionCount: 1, }, }, - prevMapping: nil, + prevMapping: nil, }, wantPartitionSlots: []*PartitionSlotToConsumerInstance{ {