diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index c21f20874..16f7abc8c 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -124,7 +124,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st } func (lock *LiveLock) IsLocked() bool { - return lock.isLocked + return lock!=nil && lock.isLocked } func (lock *LiveLock) StopLock() error { diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 52b34ddbc..34a263032 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -47,6 +47,7 @@ type MessageQueueBroker struct { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { pub_broker_balancer := pub_balancer.NewBalancer() + coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer) mqBroker = &MessageQueueBroker{ option: option, @@ -55,9 +56,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial filers: make(map[pb.ServerAddress]struct{}), localTopicManager: topic.NewLocalTopicManager(), Balancer: pub_broker_balancer, - Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer), + Coordinator: coordinator, } mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) + pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 16c9fdb4d..310e5ac78 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -30,11 +30,11 @@ func main() { Namespace: *namespace, Topic: *topic, Filter: "", - StartTime: time.Now(), + StartTime: time.Unix(0, 0), } processorConfig := sub_client.ProcessorConfiguration{ - ConcurrentPartitionLimit: 1, + ConcurrentPartitionLimit: 6, } brokers := strings.Split(*seedBrokers, ",") diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e60243083..afafb15ea 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -145,7 +145,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { - return fmt.Errorf("subscribe error: %v", err) + return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { continue @@ -156,10 +156,10 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s if processErr != nil { return fmt.Errorf("process error: %v", processErr) } + sub.alreadyProcessedTsNs = m.Data.TsNs if !shouldContinue { return nil } - sub.alreadyProcessedTsNs = m.Data.TsNs case *mq_pb.SubscribeResponse_Ctrl: if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return io.EOF diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 4524d95d0..0bcbdd51b 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -32,6 +32,7 @@ type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name + OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) } func NewBalancer() *Balancer { diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 33c5a864b..9379a341d 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -33,7 +33,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } } } - if len(assignments) > 0 || !publish { + if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { // glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) return assignments, nil } @@ -48,5 +48,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu if balancer.Brokers.IsEmpty() { return nil, ErrNoBroker } - return allocateTopicPartitions(balancer.Brokers, partitionCount), nil + assignments = allocateTopicPartitions(balancer.Brokers, partitionCount) + balancer.OnPartitionChange(topic, assignments) + return } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index dad93dfe5..9b62f616e 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -52,32 +52,40 @@ func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){ cg.reBalanceTimer = nil } cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { - cg.RebalanceConsumberGroupInstances(reason) + cg.RebalanceConsumberGroupInstances(nil, reason) cg.reBalanceTimer = nil }) } -func (cg *ConsumerGroup) OnPartitionListChange() { +func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) { if cg.reBalanceTimer != nil { cg.reBalanceTimer.Stop() cg.reBalanceTimer = nil } - cg.RebalanceConsumberGroupInstances("partition list change") + partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount) + for _, assignment := range assignments { + partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker) + } + cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change") } -func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) { - println("rebalance due to", reason, "...") +func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { + glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) now := time.Now().UnixNano() // collect current topic partitions - partitionSlotToBrokerList, found := cg.pubBalancer.TopicToBrokers.Get(cg.topic.String()) - if !found { - glog.V(0).Infof("topic %s not found in balancer", cg.topic.String()) - return + partitionSlotToBrokerList := knownPartitionSlotToBrokerList + if partitionSlotToBrokerList == nil { + var found bool + partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String()) + if !found { + glog.V(0).Infof("topic %s not found in balancer", cg.topic.String()) + return + } } // collect current consumer group instance ids - consumerInstanceIds := make([]string, 0) + var consumerInstanceIds []string for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() { consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId) } @@ -116,6 +124,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) { }, }, } + println("sending response to", consumerGroupInstance.InstanceId, "...") consumerGroupInstance.ResponseChan <- response } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 9a88c383a..269c12e66 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -28,14 +28,16 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { } } -func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups { +func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { topicName := toTopicName(topic) tcg, _ := c.TopicSubscribers.Get(topicName) - if tcg == nil { + if tcg == nil && createIfMissing{ tcg = &TopicConsumerGroups{ ConsumerGroups: cmap.New[*ConsumerGroup](), } - c.TopicSubscribers.Set(topicName, tcg) + if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) { + tcg, _ = c.TopicSubscribers.Get(topicName) + } } return tcg } @@ -50,23 +52,27 @@ func toTopicName(topic *mq_pb.Topic) string { } func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { - tcg := c.GetTopicConsumerGroups(topic) + tcg := c.GetTopicConsumerGroups(topic, true) cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil { cg = NewConsumerGroup(topic, c.balancer) - tcg.ConsumerGroups.Set(consumerGroup, cg) + if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){ + cg, _ = tcg.ConsumerGroups.Get(consumerGroup) + } } cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance) if cgi == nil { cgi = NewConsumerGroupInstance(consumerGroupInstance) - cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi) + if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){ + cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance) + } } cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic) return cgi } func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) { - tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) + tcg := c.GetTopicConsumerGroups(topic, false) if tcg == nil { return } @@ -83,3 +89,13 @@ func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance stri c.RemoveTopic(topic) } } + +func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) { + tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) + if tcg == nil { + return + } + for _, cg := range tcg.ConsumerGroups.Items() { + cg.OnPartitionListChange(assignments) + } +}