You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

91 lines
2.2 KiB

  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  14. )
  15. type MetaAggregator struct {
  16. filers []string
  17. grpcDialOption grpc.DialOption
  18. MetaLogBuffer *log_buffer.LogBuffer
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. }
  23. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  24. t := &MetaAggregator{
  25. filers: filers,
  26. grpcDialOption: grpcDialOption,
  27. }
  28. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  29. t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() {
  30. t.ListenersCond.Broadcast()
  31. })
  32. return t
  33. }
  34. func (ma *MetaAggregator) StartLoopSubscribe(lastTsNs int64) {
  35. for _, filer := range ma.filers {
  36. go ma.subscribeToOneFiler(filer, lastTsNs)
  37. }
  38. }
  39. func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
  40. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  41. data, err := proto.Marshal(event)
  42. if err != nil {
  43. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  44. return err
  45. }
  46. dir := event.Directory
  47. println("received meta change", dir, "size", len(data))
  48. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
  49. return nil
  50. }
  51. for {
  52. err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  53. stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
  54. ClientName: "filer",
  55. PathPrefix: "/",
  56. SinceNs: lastTsNs,
  57. })
  58. if err != nil {
  59. return fmt.Errorf("subscribe: %v", err)
  60. }
  61. for {
  62. resp, listenErr := stream.Recv()
  63. if listenErr == io.EOF {
  64. return nil
  65. }
  66. if listenErr != nil {
  67. return listenErr
  68. }
  69. if err := processEventFn(resp); err != nil {
  70. return fmt.Errorf("process %v: %v", resp, err)
  71. }
  72. lastTsNs = resp.TsNs
  73. }
  74. })
  75. if err != nil {
  76. glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
  77. time.Sleep(1733 * time.Millisecond)
  78. }
  79. }
  80. }