diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 83a26446c..dcc621c4c 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -113,7 +113,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition)) + localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) b.localTopicManager.AddTopicPartition(t, localPartition) } } @@ -139,7 +139,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m return ret, nil } -func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { +func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) @@ -166,7 +166,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Pa } } -func (b *MessageQueueBroker) genLogOnDiskReadFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType { +func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType { topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index e0e138ef2..e8238a5f7 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,10 +3,13 @@ package broker import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" + jsonpb "google.golang.org/protobuf/encoding/protojson" "math/rand" "net" "sync/atomic" @@ -54,9 +57,13 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) + localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p) + // if not created, return error + if err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) + glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) + return stream.Send(response) + } } ackInterval = int(initMessage.AckInterval) stream.Send(response) @@ -141,6 +148,44 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return nil } +func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { + // load local topic partition from configuration on filer if not found + var conf *mq_pb.ConfigureTopicResponse + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) + } + // parse into filer conf object + conf = &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) + } + return nil + }); err != nil { + return nil, err + } + + // create local topic partition + self := b.option.BrokerAddress() + var hasCreated bool + for _, assignment := range conf.BrokerPartitionAssignments { + if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) { + localTopicPartition = topic.FromPbBrokerPartitionAssignment(self, p, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddTopicPartition(t, localTopicPartition) + hasCreated = true + break + } + } + + if !hasCreated { + return nil, fmt.Errorf("topic %v partition %v not assigned to broker %v", t, p, self) + } + + return localTopicPartition, nil +} + // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index d6114ad23..2f4af3be9 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -11,7 +11,7 @@ import ( "time" ) -func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { +func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) { ctx := stream.Context() clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) @@ -24,28 +24,31 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest var localTopicPartition *topic.LocalPartition localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) for localTopicPartition == nil { - stream.Send(&mq_pb.SubscribeMessageResponse{ - Message: &mq_pb.SubscribeMessageResponse_Ctrl{ - Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ - Error: "not initialized", + localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, partition) + // if not created, return error + if err != nil { + stream.Send(&mq_pb.SubscribeMessageResponse{ + Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ + Error: fmt.Sprintf("topic %v partition %v not setup: %v", t, partition, err), + }, }, - }, - }) - time.Sleep(337 * time.Millisecond) - // Check if the client has disconnected by monitoring the context - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected + }) + time.Sleep(337 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return nil + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) return nil + default: + // Continue processing the request } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) - return nil - default: - // Continue processing the request } - localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 1a2c09ca4..9ef277f4d 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -32,6 +32,10 @@ type MessageQueueBrokerOption struct { VolumeServerAccess string // how to access volume servers } +func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress { + return pb.NewServerAddress(option.Ip, option.Port, 0) +} + type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer option *MessageQueueBrokerOption @@ -55,7 +59,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), filers: make(map[pb.ServerAddress]struct{}), localTopicManager: topic.NewLocalTopicManager(), Balancer: pub_broker_balancer, @@ -76,13 +80,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial for mqBroker.currentFiler == "" { time.Sleep(time.Millisecond * 237) } - self := fmt.Sprintf("%s:%d", option.Ip, option.Port) + self := option.BrokerAddress() glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) - mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, self) + mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self)) for { - err := mqBroker.BrokerConnectToBalancer(self) + err := mqBroker.BrokerConnectToBalancer(string(self)) if err != nil { fmt.Printf("BrokerConnectToBalancer: %v\n", err) }