Browse Source

local partition adds record type

mq2
chrislu 4 months ago
parent
commit
8111ec13bd
  1. 10
      weed/mq/broker/broker_grpc_assign.go
  2. 2
      weed/mq/broker/broker_topic_conf_read_write.go
  3. 6
      weed/mq/topic/local_partition.go

10
weed/mq/broker/broker_grpc_assign.go

@ -15,9 +15,15 @@ import (
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{} ret := &mq_pb.AssignTopicPartitionsResponse{}
t := topic.FromPbTopic(request.Topic)
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
glog.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
}
// drain existing topic partition subscriptions // drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments { for _, assignment := range request.BrokerPartitionAssignments {
t := topic.FromPbTopic(request.Topic)
partition := topic.FromPbPartition(assignment.Partition) partition := topic.FromPbPartition(assignment.Partition)
b.accessLock.Lock() b.accessLock.Lock()
if request.IsDraining { if request.IsDraining {
@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else { } else {
var localPartition *topic.LocalPartition var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType)
b.localTopicManager.AddLocalPartition(t, localPartition) b.localTopicManager.AddLocalPartition(t, localPartition)
} }
} }

2
weed/mq/broker/broker_topic_conf_read_write.go

@ -40,7 +40,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
self := b.option.BrokerAddress() self := b.option.BrokerAddress()
for _, assignment := range conf.BrokerPartitionAssignments { for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType)
b.localTopicManager.AddLocalPartition(t, localPartition) b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true isGenerated = true
break break

6
weed/mq/topic/local_partition.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -28,6 +29,8 @@ type LocalPartition struct {
Publishers *LocalPartitionPublishers Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers Subscribers *LocalPartitionSubscribers
RecordType *schema_pb.RecordType
publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn followerGrpcConnection *grpc.ClientConn
Follower string Follower string
@ -35,11 +38,12 @@ type LocalPartition struct {
var TIME_FORMAT = "2006-01-02-15-04-05" var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType, recordType *schema_pb.RecordType) *LocalPartition {
lp := &LocalPartition{ lp := &LocalPartition{
Partition: partition, Partition: partition,
Publishers: NewLocalPartitionPublishers(), Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(), Subscribers: NewLocalPartitionSubscribers(),
RecordType: recordType,
} }
lp.ListenersCond = sync.NewCond(&lp.ListenersLock) lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),

Loading…
Cancel
Save