diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index ad71a8bc2..467ceb81d 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -37,7 +37,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) for _, bpa := range ret.BrokerPartitionAssignments { - // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker) + fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ Topic: request.Topic, @@ -66,7 +66,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } } - // TODO revert if some error happens in the middle of the assignments + glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, ret.BrokerPartitionAssignments) return ret, err } @@ -107,6 +107,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } + glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go index 4ff12737f..7f6d62a67 100644 --- a/weed/mq/client/pub_client/connect.go +++ b/weed/mq/client/pub_client/connect.go @@ -40,6 +40,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str RingSize: partition.RingSize, RangeStart: partition.RangeStart, RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, }, AckInterval: 128, }, diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index e55bfd256..ccc83b58d 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -3,6 +3,7 @@ package pub_client import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -39,6 +40,7 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { }, IsForPublish: true, }) + glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp) if p.config.CreateTopic && err != nil { _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ Topic: &mq_pb.Topic{ @@ -58,12 +60,14 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { }, IsForPublish: true, }) + glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp) } if err != nil { return err } for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { + glog.V(0).Infof("topic %s/%s partition %v leader %s followers %v", p.namespace, p.topic, brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker, brokerPartitionAssignment.FollowerBrokers) // partition => publishClient publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) if err != nil { diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 9b2113162..cf94fec5b 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -5,10 +5,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "math/rand" + "time" ) func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) { // divide the ring into partitions + now := time.Now().UnixNano() rangeSize := MaxPartitionCount / partitionCount for i := int32(0); i < partitionCount; i++ { assignment := &mq_pb.BrokerPartitionAssignment{ @@ -16,6 +18,7 @@ func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p RingSize: MaxPartitionCount, RangeStart: int32(i * rangeSize), RangeStop: int32((i + 1) * rangeSize), + UnixTimeNs: now, }, } if i == partitionCount-1 { diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index 461e93c61..2ae123822 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -40,7 +40,12 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, - Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize}, + Partition: topic.Partition{ + RangeStart: topicPartitionStats.Partition.RangeStart, + RangeStop: topicPartitionStats.Partition.RangeStop, + RingSize: topicPartitionStats.Partition.RingSize, + UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, + }, }, ConsumerCount: topicPartitionStats.ConsumerCount, IsLeader: topicPartitionStats.IsLeader, @@ -62,7 +67,12 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, - Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop}, + Partition: topic.Partition{ + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + RingSize: partition.RingSize, + UnixTimeNs: partition.UnixTimeNs, + }, }, ConsumerCount: 0, IsLeader: true, diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 9379a341d..209261764 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -2,6 +2,7 @@ package pub_balancer import ( "errors" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -25,6 +26,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu RingSize: MaxPartitionCount, RangeStart: topicPartitionStat.RangeStart, RangeStop: topicPartitionStat.RangeStop, + UnixTimeNs: topicPartitionStat.UnixTimeNs, }, } // TODO fix follower setting @@ -34,7 +36,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } } if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { - // glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) + glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) return assignments, nil } diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index 9dc6140b3..f4180cf81 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -8,6 +8,7 @@ import ( type PartitionSlotToBroker struct { RangeStart int32 RangeStop int32 + UnixTimeNs int64 AssignedBroker string } @@ -38,6 +39,7 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{ RangeStart: partition.RangeStart, RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, AssignedBroker: broker, }) } diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index ae2bf1c17..c7f104af1 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -84,6 +84,7 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{ RangeStart: partition.RangeStart, RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, Broker: partition.AssignedBroker, }) } diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index b559007b5..7f02253f6 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -5,6 +5,7 @@ import "github.com/seaweedfs/seaweedfs/weed/mq/topic" type PartitionSlotToConsumerInstance struct { RangeStart int32 RangeStop int32 + UnixTimeNs int64 Broker string AssignedInstanceId string } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 5cf315ddb..0b148816a 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -28,7 +28,7 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb. isLeader: isLeader, FollowerBrokers: followerBrokers, logBuffer: log_buffer.NewLogBuffer( - fmt.Sprintf("%d/%4d-%4d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), + fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), 2*time.Minute, logFlushFn, func() {