You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
3.7 KiB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
  1. package broker
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. jsonpb "google.golang.org/protobuf/encoding/protojson"
  12. )
  13. func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
  14. glog.V(0).Infof("save conf for topic %v to filer", t)
  15. // save the topic configuration on filer
  16. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  17. if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  18. var buf bytes.Buffer
  19. filer.ProtoToText(&buf, conf)
  20. return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
  21. }); err != nil {
  22. return fmt.Errorf("save topic to %s: %v", topicDir, err)
  23. }
  24. return nil
  25. }
  26. // readTopicConfFromFiler reads the topic configuration from filer
  27. // this should only be run in broker leader, to ensure correct active broker list.
  28. func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
  29. glog.V(0).Infof("load conf for topic %v from filer", t)
  30. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  31. if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  32. data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
  33. if err == filer_pb.ErrNotFound {
  34. return err
  35. }
  36. if err != nil {
  37. return fmt.Errorf("read topic.conf of %v: %v", t, err)
  38. }
  39. // parse into filer conf object
  40. conf = &mq_pb.ConfigureTopicResponse{}
  41. if err = jsonpb.Unmarshal(data, conf); err != nil {
  42. return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
  43. }
  44. return nil
  45. }); err != nil {
  46. return nil, err
  47. }
  48. return conf, nil
  49. }
  50. func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
  51. b.accessLock.Lock()
  52. defer b.accessLock.Unlock()
  53. if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
  54. localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition)
  55. if err != nil {
  56. return nil, false, err
  57. }
  58. }
  59. return localPartition, isGenerated, nil
  60. }
  61. func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
  62. self := b.option.BrokerAddress()
  63. conf, err := b.readTopicConfFromFiler(t)
  64. if err != nil {
  65. return nil, isGenerated, err
  66. }
  67. for _, assignment := range conf.BrokerPartitionAssignments {
  68. if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
  69. localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
  70. b.localTopicManager.AddTopicPartition(t, localPartition)
  71. isGenerated = true
  72. break
  73. }
  74. }
  75. return localPartition, isGenerated, nil
  76. }
  77. func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
  78. // also fix assignee broker if invalid
  79. hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
  80. if hasChanges {
  81. glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
  82. if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
  83. return err
  84. }
  85. }
  86. return err
  87. }