You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

99 lines
2.0 KiB

5 years ago
5 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "fmt"
  4. "io"
  5. "time"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/chrislusf/seaweedfs/weed/filer2"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  10. )
  11. func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
  12. // process initial request
  13. in, err := stream.Recv()
  14. if err == io.EOF {
  15. return nil
  16. }
  17. if err != nil {
  18. return err
  19. }
  20. // TODO look it up
  21. topicConfig := &messaging_pb.TopicConfiguration{
  22. }
  23. // get lock
  24. tp := TopicPartition{
  25. Namespace: in.Init.Namespace,
  26. Topic: in.Init.Topic,
  27. Partition: in.Init.Partition,
  28. }
  29. logBuffer := broker.topicLocks.RequestPublisherLock(tp, func(startTime, stopTime time.Time, buf []byte) {
  30. targetFile := fmt.Sprintf(
  31. "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
  32. filer2.TopicsDir, tp.Namespace, tp.Topic,
  33. startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
  34. tp.Partition,
  35. )
  36. if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
  37. glog.V(0).Infof("log write failed %s: %v", targetFile, err)
  38. }
  39. })
  40. defer broker.topicLocks.ReleaseLock(tp, true)
  41. updatesChan := make(chan int32)
  42. go func() {
  43. for update := range updatesChan {
  44. if err := stream.Send(&messaging_pb.PublishResponse{
  45. Config: &messaging_pb.PublishResponse_ConfigMessage{
  46. PartitionCount: update,
  47. },
  48. }); err != nil {
  49. glog.V(0).Infof("err sending publish response: %v", err)
  50. return
  51. }
  52. }
  53. }()
  54. // process each message
  55. for {
  56. in, err := stream.Recv()
  57. if err == io.EOF {
  58. return nil
  59. }
  60. if err != nil {
  61. return err
  62. }
  63. if in.Data == nil {
  64. continue
  65. }
  66. m := &messaging_pb.Message{
  67. Timestamp: time.Now().UnixNano(),
  68. Key: in.Data.Key,
  69. Value: in.Data.Value,
  70. Headers: in.Data.Headers,
  71. }
  72. data, err := proto.Marshal(m)
  73. if err != nil {
  74. glog.Errorf("marshall error: %v\n", err)
  75. continue
  76. }
  77. logBuffer.AddToBuffer(in.Data.Key, data)
  78. }
  79. }