You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
1.9 KiB

  1. package broker
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
  12. // process initial request
  13. in, err := stream.Recv()
  14. if err == io.EOF {
  15. return nil
  16. }
  17. if err != nil {
  18. return err
  19. }
  20. subscriberId := in.Init.SubscriberId
  21. // get lock
  22. tp := TopicPartition{
  23. Namespace: in.Init.Namespace,
  24. Topic: in.Init.Topic,
  25. Partition: in.Init.Partition,
  26. }
  27. lock := broker.topicLocks.RequestSubscriberLock(tp)
  28. defer broker.topicLocks.ReleaseLock(tp, false)
  29. cond := sync.NewCond(&lock.Mutex)
  30. lastReadTime := time.Now()
  31. switch in.Init.StartPosition {
  32. case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
  33. lastReadTime = time.Unix(0, in.Init.TimestampNs)
  34. case messaging_pb.SubscriberMessage_InitMessage_LATEST:
  35. case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
  36. }
  37. // how to process each message
  38. // an error returned will end the subscription
  39. eachMessageFn := func(m *messaging_pb.Message) error {
  40. err := stream.Send(&messaging_pb.BrokerMessage{
  41. Data: m,
  42. })
  43. if err != nil {
  44. glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
  45. }
  46. return err
  47. }
  48. // loop through all messages
  49. for {
  50. _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
  51. for pos := 0; pos+4 < len(buf); {
  52. size := util.BytesToUint32(buf[pos : pos+4])
  53. entryData := buf[pos+4 : pos+4+int(size)]
  54. m := &messaging_pb.Message{}
  55. if err = proto.Unmarshal(entryData, m); err != nil {
  56. glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
  57. pos += 4 + int(size)
  58. continue
  59. }
  60. if err = eachMessageFn(m); err != nil {
  61. return err
  62. }
  63. lastReadTime = time.Unix(0, m.Timestamp)
  64. pos += 4 + int(size)
  65. }
  66. lock.Mutex.Lock()
  67. cond.Wait()
  68. lock.Mutex.Unlock()
  69. }
  70. }