You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
2.4 KiB

  1. package broker
  2. import (
  3. "io"
  4. "time"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
  12. // process initial request
  13. in, err := stream.Recv()
  14. if err == io.EOF {
  15. return nil
  16. }
  17. if err != nil {
  18. return err
  19. }
  20. subscriberId := in.Init.SubscriberId
  21. println("+ subscriber:", subscriberId)
  22. defer println("- subscriber:", subscriberId)
  23. // TODO look it up
  24. topicConfig := &messaging_pb.TopicConfiguration{
  25. }
  26. // get lock
  27. tp := TopicPartition{
  28. Namespace: in.Init.Namespace,
  29. Topic: in.Init.Topic,
  30. Partition: in.Init.Partition,
  31. }
  32. lock := broker.topicLocks.RequestLock(tp, topicConfig, false)
  33. defer broker.topicLocks.ReleaseLock(tp, false)
  34. lastReadTime := time.Now()
  35. switch in.Init.StartPosition {
  36. case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
  37. lastReadTime = time.Unix(0, in.Init.TimestampNs)
  38. case messaging_pb.SubscriberMessage_InitMessage_LATEST:
  39. case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
  40. }
  41. // how to process each message
  42. // an error returned will end the subscription
  43. eachMessageFn := func(m *messaging_pb.Message) error {
  44. err := stream.Send(&messaging_pb.BrokerMessage{
  45. Data: m,
  46. })
  47. if err != nil {
  48. glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
  49. }
  50. return err
  51. }
  52. // loop through all messages
  53. for {
  54. _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
  55. for pos := 0; pos+4 < len(buf); {
  56. size := util.BytesToUint32(buf[pos : pos+4])
  57. entryData := buf[pos+4 : pos+4+int(size)]
  58. logEntry := &filer_pb.LogEntry{}
  59. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  60. glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
  61. pos += 4 + int(size)
  62. continue
  63. }
  64. m := &messaging_pb.Message{}
  65. if err = proto.Unmarshal(logEntry.Data, m); err != nil {
  66. glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
  67. pos += 4 + int(size)
  68. continue
  69. }
  70. // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value))
  71. if err = eachMessageFn(m); err != nil {
  72. return err
  73. }
  74. lastReadTime = time.Unix(0, m.Timestamp)
  75. pos += 4 + int(size)
  76. }
  77. lock.Mutex.Lock()
  78. lock.cond.Wait()
  79. lock.Mutex.Unlock()
  80. }
  81. }