You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
2.3 KiB

5 years ago
  1. package broker
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/filer2"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  10. )
  11. type TopicPartition struct {
  12. Namespace string
  13. Topic string
  14. Partition int32
  15. }
  16. type TopicLock struct {
  17. sync.Mutex
  18. cond *sync.Cond
  19. subscriberCount int
  20. publisherCount int
  21. logBuffer *log_buffer.LogBuffer
  22. }
  23. type TopicLocks struct {
  24. sync.Mutex
  25. locks map[TopicPartition]*TopicLock
  26. broker *MessageBroker
  27. }
  28. func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks {
  29. return &TopicLocks{
  30. locks: make(map[TopicPartition]*TopicLock),
  31. broker: messageBroker,
  32. }
  33. }
  34. func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
  35. flushFn := func(startTime, stopTime time.Time, buf []byte) {
  36. if topicConfig.IsTransient {
  37. return
  38. }
  39. // fmt.Printf("flushing with topic config %+v\n", topicConfig)
  40. targetFile := fmt.Sprintf(
  41. "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
  42. filer2.TopicsDir, tp.Namespace, tp.Topic,
  43. startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
  44. tp.Partition,
  45. )
  46. if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
  47. glog.V(0).Infof("log write failed %s: %v", targetFile, err)
  48. }
  49. }
  50. logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
  51. tl.cond.Broadcast()
  52. })
  53. return logBuffer
  54. }
  55. func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock {
  56. tl.Lock()
  57. defer tl.Unlock()
  58. lock, found := tl.locks[partition]
  59. if !found {
  60. lock = &TopicLock{}
  61. lock.cond = sync.NewCond(&lock.Mutex)
  62. tl.locks[partition] = lock
  63. lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig)
  64. }
  65. if isPublisher {
  66. lock.publisherCount++
  67. } else {
  68. lock.subscriberCount++
  69. }
  70. return lock
  71. }
  72. func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
  73. tl.Lock()
  74. defer tl.Unlock()
  75. lock, found := tl.locks[partition]
  76. if !found {
  77. return
  78. }
  79. if isPublisher {
  80. lock.publisherCount--
  81. } else {
  82. lock.subscriberCount--
  83. }
  84. if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
  85. delete(tl.locks, partition)
  86. }
  87. }