You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
6.8 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
10 months ago
10 months ago
10 months ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package topic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/status"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. type LocalPartition struct {
  17. ListenersWaits int64
  18. AckTsNs int64
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. Partition
  23. LogBuffer *log_buffer.LogBuffer
  24. Publishers *LocalPartitionPublishers
  25. Subscribers *LocalPartitionSubscribers
  26. followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
  27. followerGrpcConnection *grpc.ClientConn
  28. follower string
  29. }
  30. var TIME_FORMAT = "2006-01-02-15-04-05"
  31. func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
  32. lp := &LocalPartition{
  33. Partition: partition,
  34. Publishers: NewLocalPartitionPublishers(),
  35. Subscribers: NewLocalPartitionSubscribers(),
  36. }
  37. lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
  38. lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
  39. 2*time.Minute, logFlushFn, readFromDiskFn, func() {
  40. if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
  41. lp.ListenersCond.Broadcast()
  42. }
  43. })
  44. return lp
  45. }
  46. func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
  47. p.LogBuffer.AddToBuffer(message)
  48. // maybe send to the follower
  49. if p.followerStream != nil {
  50. // println("recv", string(message.Key), message.TsNs)
  51. if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
  52. Message: &mq_pb.PublishFollowMeRequest_Data{
  53. Data: message,
  54. },
  55. }); followErr != nil {
  56. return fmt.Errorf("send to follower %s: %v", p.follower, followErr)
  57. }
  58. } else {
  59. atomic.StoreInt64(&p.AckTsNs, message.TsNs)
  60. }
  61. return nil
  62. }
  63. func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
  64. onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error {
  65. var processedPosition log_buffer.MessagePosition
  66. var readPersistedLogErr error
  67. var readInMemoryLogErr error
  68. var isDone bool
  69. for {
  70. processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
  71. if readPersistedLogErr != nil {
  72. glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
  73. return readPersistedLogErr
  74. }
  75. if isDone {
  76. return nil
  77. }
  78. startPosition = processedPosition
  79. processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
  80. if isDone {
  81. return nil
  82. }
  83. startPosition = processedPosition
  84. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  85. continue
  86. }
  87. if readInMemoryLogErr != nil {
  88. glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
  89. return readInMemoryLogErr
  90. }
  91. }
  92. }
  93. func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time {
  94. return p.LogBuffer.GetEarliestTime()
  95. }
  96. func (p *LocalPartition) HasData() bool {
  97. return !p.LogBuffer.GetEarliestTime().IsZero()
  98. }
  99. func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition {
  100. return p.LogBuffer.GetEarliestPosition()
  101. }
  102. func (p *LocalPartition) closePublishers() {
  103. p.Publishers.SignalShutdown()
  104. }
  105. func (p *LocalPartition) closeSubscribers() {
  106. p.Subscribers.SignalShutdown()
  107. }
  108. func (p *LocalPartition) WaitUntilNoPublishers() {
  109. for {
  110. if p.Publishers.Size() == 0 {
  111. return
  112. }
  113. time.Sleep(113 * time.Millisecond)
  114. }
  115. }
  116. func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
  117. if p.followerStream != nil {
  118. return nil
  119. }
  120. if len(initMessage.FollowerBrokers) == 0 {
  121. return nil
  122. }
  123. p.follower = initMessage.FollowerBrokers[0]
  124. ctx := context.Background()
  125. p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
  126. if err != nil {
  127. return fmt.Errorf("fail to dial %s: %v", p.follower, err)
  128. }
  129. followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
  130. p.followerStream, err = followerClient.PublishFollowMe(ctx)
  131. if err != nil {
  132. return fmt.Errorf("fail to create publish client: %v", err)
  133. }
  134. if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
  135. Message: &mq_pb.PublishFollowMeRequest_Init{
  136. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  137. Topic: initMessage.Topic,
  138. Partition: initMessage.Partition,
  139. },
  140. },
  141. }); err != nil {
  142. return err
  143. }
  144. // start receiving ack from follower
  145. go func() {
  146. defer func() {
  147. // println("stop receiving ack from follower")
  148. }()
  149. for {
  150. ack, err := p.followerStream.Recv()
  151. if err != nil {
  152. e, _ := status.FromError(err)
  153. if e.Code() == codes.Canceled {
  154. glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.follower)
  155. return
  156. }
  157. glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.follower, err)
  158. return
  159. }
  160. atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
  161. // println("recv ack", ack.AckTsNs)
  162. }
  163. }()
  164. return nil
  165. }
  166. func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
  167. if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
  168. p.LogBuffer.ShutdownLogBuffer()
  169. for !p.LogBuffer.IsAllFlushed() {
  170. time.Sleep(113 * time.Millisecond)
  171. }
  172. if p.followerStream != nil {
  173. // send close to the follower
  174. if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
  175. Message: &mq_pb.PublishFollowMeRequest_Close{
  176. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  177. },
  178. }); followErr != nil {
  179. glog.Errorf("Error closing follower stream: %v", followErr)
  180. }
  181. glog.V(4).Infof("closing grpcConnection to follower")
  182. p.followerGrpcConnection.Close()
  183. p.followerStream = nil
  184. p.follower = ""
  185. }
  186. hasShutdown = true
  187. }
  188. glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.follower, hasShutdown)
  189. return
  190. }
  191. func (p *LocalPartition) Shutdown() {
  192. p.closePublishers()
  193. p.closeSubscribers()
  194. p.LogBuffer.ShutdownLogBuffer()
  195. glog.V(0).Infof("local partition %v shutting down", p.Partition)
  196. }
  197. func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
  198. if p.followerStream != nil {
  199. if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
  200. Message: &mq_pb.PublishFollowMeRequest_Flush{
  201. Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
  202. TsNs: flushTsNs,
  203. },
  204. },
  205. }); followErr != nil {
  206. glog.Errorf("send follower %s flush message: %v", p.follower, followErr)
  207. }
  208. // println("notifying", p.follower, "flushed at", flushTsNs)
  209. }
  210. }