Browse Source

passing partition generation timestamp

pull/5637/head
chrislu 1 year ago
parent
commit
6a7a679137
  1. 5
      weed/mq/broker/broker_grpc_configure.go
  2. 1
      weed/mq/client/pub_client/connect.go
  3. 4
      weed/mq/client/pub_client/lookup.go
  4. 3
      weed/mq/pub_balancer/allocate.go
  5. 14
      weed/mq/pub_balancer/broker_stats.go
  6. 4
      weed/mq/pub_balancer/lookup.go
  7. 2
      weed/mq/pub_balancer/partition_list_broker.go
  8. 1
      weed/mq/sub_coordinator/partition_consumer_mapping.go
  9. 1
      weed/mq/sub_coordinator/partition_list.go
  10. 2
      weed/mq/topic/local_partition.go

5
weed/mq/broker/broker_grpc_configure.go

@ -37,7 +37,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
for _, bpa := range ret.BrokerPartitionAssignments { for _, bpa := range ret.BrokerPartitionAssignments {
// fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
Topic: request.Topic, Topic: request.Topic,
@ -66,7 +66,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
} }
} }
// TODO revert if some error happens in the middle of the assignments
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, ret.BrokerPartitionAssignments)
return ret, err return ret, err
} }
@ -107,6 +107,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} }
} }
glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil return ret, nil
} }

1
weed/mq/client/pub_client/connect.go

@ -40,6 +40,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
RingSize: partition.RingSize, RingSize: partition.RingSize,
RangeStart: partition.RangeStart, RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop, RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
}, },
AckInterval: 128, AckInterval: 128,
}, },

4
weed/mq/client/pub_client/lookup.go

@ -3,6 +3,7 @@ package pub_client
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
) )
@ -39,6 +40,7 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
}, },
IsForPublish: true, IsForPublish: true,
}) })
glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
if p.config.CreateTopic && err != nil { if p.config.CreateTopic && err != nil {
_, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: &mq_pb.Topic{ Topic: &mq_pb.Topic{
@ -58,12 +60,14 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
}, },
IsForPublish: true, IsForPublish: true,
}) })
glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
} }
if err != nil { if err != nil {
return err return err
} }
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
glog.V(0).Infof("topic %s/%s partition %v leader %s followers %v", p.namespace, p.topic, brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker, brokerPartitionAssignment.FollowerBrokers)
// partition => publishClient // partition => publishClient
publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
if err != nil { if err != nil {

3
weed/mq/pub_balancer/allocate.go

@ -5,10 +5,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"math/rand" "math/rand"
"time"
) )
func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) { func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
// divide the ring into partitions // divide the ring into partitions
now := time.Now().UnixNano()
rangeSize := MaxPartitionCount / partitionCount rangeSize := MaxPartitionCount / partitionCount
for i := int32(0); i < partitionCount; i++ { for i := int32(0); i < partitionCount; i++ {
assignment := &mq_pb.BrokerPartitionAssignment{ assignment := &mq_pb.BrokerPartitionAssignment{
@ -16,6 +18,7 @@ func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
RingSize: MaxPartitionCount, RingSize: MaxPartitionCount,
RangeStart: int32(i * rangeSize), RangeStart: int32(i * rangeSize),
RangeStop: int32((i + 1) * rangeSize), RangeStop: int32((i + 1) * rangeSize),
UnixTimeNs: now,
}, },
} }
if i == partitionCount-1 { if i == partitionCount-1 {

14
weed/mq/pub_balancer/broker_stats.go

@ -40,7 +40,12 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
tps := &TopicPartitionStats{ tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{ TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
Partition: topic.Partition{
RangeStart: topicPartitionStats.Partition.RangeStart,
RangeStop: topicPartitionStats.Partition.RangeStop,
RingSize: topicPartitionStats.Partition.RingSize,
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
}, },
ConsumerCount: topicPartitionStats.ConsumerCount, ConsumerCount: topicPartitionStats.ConsumerCount,
IsLeader: topicPartitionStats.IsLeader, IsLeader: topicPartitionStats.IsLeader,
@ -62,7 +67,12 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
tps := &TopicPartitionStats{ tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{ TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
Partition: topic.Partition{
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
RingSize: partition.RingSize,
UnixTimeNs: partition.UnixTimeNs,
},
}, },
ConsumerCount: 0, ConsumerCount: 0,
IsLeader: true, IsLeader: true,

4
weed/mq/pub_balancer/lookup.go

@ -2,6 +2,7 @@ package pub_balancer
import ( import (
"errors" "errors"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
) )
@ -25,6 +26,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
RingSize: MaxPartitionCount, RingSize: MaxPartitionCount,
RangeStart: topicPartitionStat.RangeStart, RangeStart: topicPartitionStat.RangeStart,
RangeStop: topicPartitionStat.RangeStop, RangeStop: topicPartitionStat.RangeStop,
UnixTimeNs: topicPartitionStat.UnixTimeNs,
}, },
} }
// TODO fix follower setting // TODO fix follower setting
@ -34,7 +36,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
} }
} }
if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
// glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments)
return assignments, nil return assignments, nil
} }

2
weed/mq/pub_balancer/partition_list_broker.go

@ -8,6 +8,7 @@ import (
type PartitionSlotToBroker struct { type PartitionSlotToBroker struct {
RangeStart int32 RangeStart int32
RangeStop int32 RangeStop int32
UnixTimeNs int64
AssignedBroker string AssignedBroker string
} }
@ -38,6 +39,7 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke
ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{ ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
RangeStart: partition.RangeStart, RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop, RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
AssignedBroker: broker, AssignedBroker: broker,
}) })
} }

1
weed/mq/sub_coordinator/partition_consumer_mapping.go

@ -84,6 +84,7 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{ newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
RangeStart: partition.RangeStart, RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop, RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
Broker: partition.AssignedBroker, Broker: partition.AssignedBroker,
}) })
} }

1
weed/mq/sub_coordinator/partition_list.go

@ -5,6 +5,7 @@ import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
type PartitionSlotToConsumerInstance struct { type PartitionSlotToConsumerInstance struct {
RangeStart int32 RangeStart int32
RangeStop int32 RangeStop int32
UnixTimeNs int64
Broker string Broker string
AssignedInstanceId string AssignedInstanceId string
} }

2
weed/mq/topic/local_partition.go

@ -28,7 +28,7 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.
isLeader: isLeader, isLeader: isLeader,
FollowerBrokers: followerBrokers, FollowerBrokers: followerBrokers,
logBuffer: log_buffer.NewLogBuffer( logBuffer: log_buffer.NewLogBuffer(
fmt.Sprintf("%d/%4d-%4d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
2*time.Minute, 2*time.Minute,
logFlushFn, logFlushFn,
func() { func() {

Loading…
Cancel
Save