You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.4 KiB

5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  14. )
  15. type MetaAggregator struct {
  16. filers []string
  17. grpcDialOption grpc.DialOption
  18. MetaLogBuffer *log_buffer.LogBuffer
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. }
  23. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  24. // The old data comes from what each LocalMetadata persisted on disk.
  25. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  26. t := &MetaAggregator{
  27. filers: filers,
  28. grpcDialOption: grpcDialOption,
  29. }
  30. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  31. t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
  32. t.ListenersCond.Broadcast()
  33. })
  34. return t
  35. }
  36. func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
  37. for _, filer := range ma.filers {
  38. go ma.subscribeToOneFiler(f, self, filer)
  39. }
  40. }
  41. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
  42. /*
  43. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  44. When reading from other filers' local meta changes:
  45. * if the received change does not contain signature from self, apply the change to current filer store.
  46. Upon connecting to other filers, need to remember their signature and their offsets.
  47. */
  48. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  49. lastPersistTime := time.Now()
  50. changesSinceLastPersist := 0
  51. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  52. MaxChangeLimit := 100
  53. if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
  54. if self != filer {
  55. if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
  56. lastTsNs = prevTsNs
  57. }
  58. glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
  59. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  60. if err := Replay(f.Store.ActualStore, event); err != nil {
  61. glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
  62. return
  63. }
  64. changesSinceLastPersist++
  65. if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
  66. if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
  67. lastPersistTime = time.Now()
  68. changesSinceLastPersist = 0
  69. } else {
  70. glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
  71. }
  72. }
  73. }
  74. } else {
  75. glog.V(0).Infof("skipping following self: %v", self)
  76. }
  77. }
  78. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  79. data, err := proto.Marshal(event)
  80. if err != nil {
  81. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  82. return err
  83. }
  84. dir := event.Directory
  85. // println("received meta change", dir, "size", len(data))
  86. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  87. if maybeReplicateMetadataChange != nil {
  88. maybeReplicateMetadataChange(event)
  89. }
  90. return nil
  91. }
  92. for {
  93. err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  94. stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
  95. ClientName: "filer:" + self,
  96. PathPrefix: "/",
  97. SinceNs: lastTsNs,
  98. })
  99. if err != nil {
  100. return fmt.Errorf("subscribe: %v", err)
  101. }
  102. for {
  103. resp, listenErr := stream.Recv()
  104. if listenErr == io.EOF {
  105. return nil
  106. }
  107. if listenErr != nil {
  108. return listenErr
  109. }
  110. if err := processEventFn(resp); err != nil {
  111. return fmt.Errorf("process %v: %v", resp, err)
  112. }
  113. lastTsNs = resp.TsNs
  114. }
  115. })
  116. if err != nil {
  117. glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
  118. time.Sleep(1733 * time.Millisecond)
  119. }
  120. }
  121. }
  122. func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool, err error) {
  123. err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  124. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  125. if err != nil {
  126. return err
  127. }
  128. isSame = f.Signature == resp.Signature
  129. return nil
  130. })
  131. return
  132. }