You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
3.5 KiB

5 years ago
5 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  14. )
  15. type MetaAggregator struct {
  16. filers []string
  17. grpcDialOption grpc.DialOption
  18. MetaLogBuffer *log_buffer.LogBuffer
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. }
  23. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  24. t := &MetaAggregator{
  25. filers: filers,
  26. grpcDialOption: grpcDialOption,
  27. }
  28. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  29. t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() {
  30. t.ListenersCond.Broadcast()
  31. })
  32. return t
  33. }
  34. func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
  35. for _, filer := range ma.filers {
  36. go ma.subscribeToOneFiler(f, self, filer)
  37. }
  38. }
  39. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
  40. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  41. lastPersistTime := time.Now()
  42. changesSinceLastPersist := 0
  43. lastTsNs := int64(0)
  44. MaxChangeLimit := 100
  45. if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
  46. if self != filer {
  47. if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
  48. lastTsNs = prevTsNs
  49. }
  50. glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
  51. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  52. if err := Replay(f.Store.ActualStore, event); err != nil {
  53. glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
  54. return
  55. }
  56. changesSinceLastPersist++
  57. if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
  58. if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
  59. lastPersistTime = time.Now()
  60. changesSinceLastPersist = 0
  61. } else {
  62. glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
  63. }
  64. }
  65. }
  66. } else {
  67. glog.V(0).Infof("skipping following self: %v", self)
  68. }
  69. }
  70. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  71. data, err := proto.Marshal(event)
  72. if err != nil {
  73. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  74. return err
  75. }
  76. dir := event.Directory
  77. // println("received meta change", dir, "size", len(data))
  78. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
  79. if maybeReplicateMetadataChange != nil {
  80. maybeReplicateMetadataChange(event)
  81. }
  82. return nil
  83. }
  84. for {
  85. err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  86. stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
  87. ClientName: "filer:" + self,
  88. PathPrefix: "/",
  89. SinceNs: lastTsNs,
  90. })
  91. if err != nil {
  92. return fmt.Errorf("subscribe: %v", err)
  93. }
  94. for {
  95. resp, listenErr := stream.Recv()
  96. if listenErr == io.EOF {
  97. return nil
  98. }
  99. if listenErr != nil {
  100. return listenErr
  101. }
  102. if err := processEventFn(resp); err != nil {
  103. return fmt.Errorf("process %v: %v", resp, err)
  104. }
  105. lastTsNs = resp.TsNs
  106. }
  107. })
  108. if err != nil {
  109. glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
  110. time.Sleep(1733 * time.Millisecond)
  111. }
  112. }
  113. }