Browse Source

reuse local partition

pull/5637/head
chrislu 11 months ago
parent
commit
ba73199174
  1. 18
      weed/mq/broker/broker_grpc_configure.go
  2. 2
      weed/mq/broker/broker_server.go
  3. 4
      weed/mq/topic/local_partition.go

18
weed/mq/broker/broker_grpc_configure.go

@ -82,18 +82,20 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// drain existing topic partition subscriptions // drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments { for _, assignment := range request.BrokerPartitionAssignments {
localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
t := topic.FromPbTopic(request.Topic)
partition := topic.FromPbPartition(assignment.Partition)
b.accessLock.Lock()
if request.IsDraining { if request.IsDraining {
// TODO drain existing topic partition subscriptions // TODO drain existing topic partition subscriptions
b.localTopicManager.RemoveTopicPartition(
topic.FromPbTopic(request.Topic),
localPartition.Partition)
b.localTopicManager.RemoveTopicPartition(t, partition)
} else { } else {
b.localTopicManager.AddTopicPartition(
topic.FromPbTopic(request.Topic),
localPartition)
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
}
} }
b.accessLock.Unlock()
} }
// if is leader, notify the followers to drain existing topic partition subscriptions // if is leader, notify the followers to drain existing topic partition subscriptions

2
weed/mq/broker/broker_server.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
@ -43,6 +44,7 @@ type MessageQueueBroker struct {
lockAsBalancer *cluster.LiveLock lockAsBalancer *cluster.LiveLock
currentBalancer pb.ServerAddress currentBalancer pb.ServerAddress
Coordinator *sub_coordinator.Coordinator Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex
} }
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {

4
weed/mq/topic/local_partition.go

@ -84,13 +84,13 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message
return p.logBuffer.GetEarliestPosition() return p.logBuffer.GetEarliestPosition()
} }
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
isLeader := assignment.LeaderBroker == string(self) isLeader := assignment.LeaderBroker == string(self)
followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
for i, followerBroker := range assignment.FollowerBrokers { for i, followerBroker := range assignment.FollowerBrokers {
followers[i] = pb.ServerAddress(followerBroker) followers[i] = pb.ServerAddress(followerBroker)
} }
return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers, logFlushFn, readFromDiskFn)
return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn)
} }
func (p *LocalPartition) closePublishers() { func (p *LocalPartition) closePublishers() {

Loading…
Cancel
Save