You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
4.4 KiB

12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
10 months ago
10 months ago
11 months ago
12 months ago
  1. package broker
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. jsonpb "google.golang.org/protobuf/encoding/protojson"
  12. )
  13. func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
  14. glog.V(0).Infof("save conf for topic %v to filer", t)
  15. // save the topic configuration on filer
  16. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  17. if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  18. var buf bytes.Buffer
  19. filer.ProtoToText(&buf, conf)
  20. return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
  21. }); err != nil {
  22. return fmt.Errorf("save topic to %s: %v", topicDir, err)
  23. }
  24. return nil
  25. }
  26. // readTopicConfFromFiler reads the topic configuration from filer
  27. // this should only be run in broker leader, to ensure correct active broker list.
  28. func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
  29. glog.V(0).Infof("load conf for topic %v from filer", t)
  30. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  31. if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  32. data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
  33. if err == filer_pb.ErrNotFound {
  34. return err
  35. }
  36. if err != nil {
  37. return fmt.Errorf("read topic.conf of %v: %v", t, err)
  38. }
  39. // parse into filer conf object
  40. conf = &mq_pb.ConfigureTopicResponse{}
  41. if err = jsonpb.Unmarshal(data, conf); err != nil {
  42. return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
  43. }
  44. return nil
  45. }); err != nil {
  46. return nil, err
  47. }
  48. return conf, nil
  49. }
  50. func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
  51. // get or generate a local partition
  52. conf, readConfErr := b.readTopicConfFromFiler(t)
  53. if readConfErr != nil {
  54. glog.Errorf("topic %v not found: %v", t, readConfErr)
  55. return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
  56. }
  57. localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
  58. if getOrGenError != nil {
  59. glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
  60. return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
  61. }
  62. return localTopicPartition, nil
  63. }
  64. func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
  65. b.accessLock.Lock()
  66. defer b.accessLock.Unlock()
  67. if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
  68. localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf)
  69. if err != nil {
  70. return nil, false, err
  71. }
  72. }
  73. return localPartition, isGenerated, nil
  74. }
  75. func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
  76. self := b.option.BrokerAddress()
  77. for _, assignment := range conf.BrokerPartitionAssignments {
  78. if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
  79. localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
  80. b.localTopicManager.AddTopicPartition(t, localPartition)
  81. isGenerated = true
  82. break
  83. }
  84. }
  85. return localPartition, isGenerated, nil
  86. }
  87. func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
  88. // also fix assignee broker if invalid
  89. hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
  90. if hasChanges {
  91. glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
  92. if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
  93. return err
  94. }
  95. }
  96. return err
  97. }