You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
3.5 KiB

12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
10 months ago
12 months ago
  1. package broker
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. jsonpb "google.golang.org/protobuf/encoding/protojson"
  12. )
  13. func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
  14. glog.V(0).Infof("save conf for topic %v to filer", t)
  15. // save the topic configuration on filer
  16. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  17. if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  18. var buf bytes.Buffer
  19. filer.ProtoToText(&buf, conf)
  20. return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
  21. }); err != nil {
  22. return fmt.Errorf("save topic to %s: %v", topicDir, err)
  23. }
  24. return nil
  25. }
  26. // readTopicConfFromFiler reads the topic configuration from filer
  27. // this should only be run in broker leader, to ensure correct active broker list.
  28. func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
  29. glog.V(0).Infof("load conf for topic %v from filer", t)
  30. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  31. if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  32. data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
  33. if err == filer_pb.ErrNotFound {
  34. return err
  35. }
  36. if err != nil {
  37. return fmt.Errorf("read topic.conf of %v: %v", t, err)
  38. }
  39. // parse into filer conf object
  40. conf = &mq_pb.ConfigureTopicResponse{}
  41. if err = jsonpb.Unmarshal(data, conf); err != nil {
  42. return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
  43. }
  44. return nil
  45. }); err != nil {
  46. return nil, err
  47. }
  48. return conf, nil
  49. }
  50. func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
  51. b.accessLock.Lock()
  52. defer b.accessLock.Unlock()
  53. if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
  54. localPartition, err = b.genLocalPartitionFromFiler(t, partition)
  55. if err != nil {
  56. return nil, err
  57. }
  58. }
  59. return localPartition, nil
  60. }
  61. func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
  62. self := b.option.BrokerAddress()
  63. conf, err := b.readTopicConfFromFiler(t)
  64. if err != nil {
  65. return nil, err
  66. }
  67. for _, assignment := range conf.BrokerPartitionAssignments {
  68. if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
  69. localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
  70. b.localTopicManager.AddTopicPartition(t, localPartition)
  71. break
  72. }
  73. }
  74. return localPartition, nil
  75. }
  76. func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
  77. // also fix assignee broker if invalid
  78. hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
  79. if hasChanges {
  80. glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
  81. if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
  82. return err
  83. }
  84. }
  85. return err
  86. }