You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.7 KiB

5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  14. )
  15. type MetaAggregator struct {
  16. filers []string
  17. grpcDialOption grpc.DialOption
  18. MetaLogBuffer *log_buffer.LogBuffer
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. }
  23. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  24. // The old data comes from what each LocalMetadata persisted on disk.
  25. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  26. t := &MetaAggregator{
  27. filers: filers,
  28. grpcDialOption: grpcDialOption,
  29. }
  30. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  31. t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
  32. t.ListenersCond.Broadcast()
  33. })
  34. return t
  35. }
  36. func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
  37. for _, filer := range ma.filers {
  38. go ma.subscribeToOneFiler(f, self, filer)
  39. }
  40. }
  41. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
  42. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  43. lastPersistTime := time.Now()
  44. changesSinceLastPersist := 0
  45. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  46. MaxChangeLimit := 100
  47. if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
  48. if self != filer {
  49. if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
  50. lastTsNs = prevTsNs
  51. }
  52. glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
  53. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  54. if err := Replay(f.Store.ActualStore, event); err != nil {
  55. glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
  56. return
  57. }
  58. changesSinceLastPersist++
  59. if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
  60. if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
  61. lastPersistTime = time.Now()
  62. changesSinceLastPersist = 0
  63. } else {
  64. glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
  65. }
  66. }
  67. }
  68. } else {
  69. glog.V(0).Infof("skipping following self: %v", self)
  70. }
  71. }
  72. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  73. data, err := proto.Marshal(event)
  74. if err != nil {
  75. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  76. return err
  77. }
  78. dir := event.Directory
  79. // println("received meta change", dir, "size", len(data))
  80. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  81. if maybeReplicateMetadataChange != nil {
  82. maybeReplicateMetadataChange(event)
  83. }
  84. return nil
  85. }
  86. for {
  87. err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  88. stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
  89. ClientName: "filer:" + self,
  90. PathPrefix: "/",
  91. SinceNs: lastTsNs,
  92. })
  93. if err != nil {
  94. return fmt.Errorf("subscribe: %v", err)
  95. }
  96. for {
  97. resp, listenErr := stream.Recv()
  98. if listenErr == io.EOF {
  99. return nil
  100. }
  101. if listenErr != nil {
  102. return listenErr
  103. }
  104. if err := processEventFn(resp); err != nil {
  105. return fmt.Errorf("process %v: %v", resp, err)
  106. }
  107. lastTsNs = resp.TsNs
  108. }
  109. })
  110. if err != nil {
  111. glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
  112. time.Sleep(1733 * time.Millisecond)
  113. }
  114. }
  115. }