From 8111ec13bd06e4a9142a87640986de8c41d4b75a Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 17 Sep 2024 23:22:27 -0700 Subject: [PATCH] local partition adds record type --- weed/mq/broker/broker_grpc_assign.go | 10 ++++++++-- weed/mq/broker/broker_topic_conf_read_write.go | 2 +- weed/mq/topic/local_partition.go | 6 +++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 48ec0d5bd..99fe88acd 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -15,9 +15,15 @@ import ( func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} + t := topic.FromPbTopic(request.Topic) + conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - t := topic.FromPbTopic(request.Topic) partition := topic.FromPbPartition(assignment.Partition) b.accessLock.Lock() if request.IsDraining { @@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType) b.localTopicManager.AddLocalPartition(t, localPartition) } } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index ea5cb71b9..476ad2533 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -40,7 +40,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition self := b.option.BrokerAddress() for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType) b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true break diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 21198d580..304f019f2 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -28,6 +29,8 @@ type LocalPartition struct { Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers + RecordType *schema_pb.RecordType + publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient followerGrpcConnection *grpc.ClientConn Follower string @@ -35,11 +38,12 @@ type LocalPartition struct { var TIME_FORMAT = "2006-01-02-15-04-05" -func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { +func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType, recordType *schema_pb.RecordType) *LocalPartition { lp := &LocalPartition{ Partition: partition, Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), + RecordType: recordType, } lp.ListenersCond = sync.NewCond(&lp.ListenersLock) lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),