You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
5.3 KiB

5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/golang/protobuf/proto"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  15. )
  16. type MetaAggregator struct {
  17. filers []string
  18. grpcDialOption grpc.DialOption
  19. MetaLogBuffer *log_buffer.LogBuffer
  20. // notifying clients
  21. ListenersLock sync.Mutex
  22. ListenersCond *sync.Cond
  23. }
  24. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  25. // The old data comes from what each LocalMetadata persisted on disk.
  26. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  27. t := &MetaAggregator{
  28. filers: filers,
  29. grpcDialOption: grpcDialOption,
  30. }
  31. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  32. t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
  33. t.ListenersCond.Broadcast()
  34. })
  35. return t
  36. }
  37. func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
  38. for _, filer := range ma.filers {
  39. go ma.subscribeToOneFiler(f, self, filer)
  40. }
  41. }
  42. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
  43. /*
  44. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  45. When reading from other filers' local meta changes:
  46. * if the received change does not contain signature from self, apply the change to current filer store.
  47. Upon connecting to other filers, need to remember their signature and their offsets.
  48. */
  49. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  50. lastPersistTime := time.Now()
  51. changesSinceLastPersist := 0
  52. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  53. MaxChangeLimit := 100
  54. isSameFilerStore, err := ma.isSameFilerStore(f, filer)
  55. for err != nil {
  56. glog.V(0).Infof("connecting to peer filer %s: %v", filer, err)
  57. time.Sleep(1357 * time.Millisecond)
  58. isSameFilerStore, err = ma.isSameFilerStore(f, filer)
  59. }
  60. if !isSameFilerStore{
  61. if prevTsNs, err := ma.readOffset(f, filer); err == nil {
  62. lastTsNs = prevTsNs
  63. }
  64. glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
  65. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  66. if err := Replay(f.Store.ActualStore, event); err != nil {
  67. glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
  68. return
  69. }
  70. changesSinceLastPersist++
  71. if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
  72. if err := ma.updateOffset(f, filer, event.TsNs); err == nil {
  73. lastPersistTime = time.Now()
  74. changesSinceLastPersist = 0
  75. } else {
  76. glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
  77. }
  78. }
  79. }
  80. }
  81. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  82. data, err := proto.Marshal(event)
  83. if err != nil {
  84. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  85. return err
  86. }
  87. dir := event.Directory
  88. // println("received meta change", dir, "size", len(data))
  89. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  90. if maybeReplicateMetadataChange != nil {
  91. maybeReplicateMetadataChange(event)
  92. }
  93. return nil
  94. }
  95. for {
  96. err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  97. stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
  98. ClientName: "filer:" + self,
  99. PathPrefix: "/",
  100. SinceNs: lastTsNs,
  101. })
  102. if err != nil {
  103. return fmt.Errorf("subscribe: %v", err)
  104. }
  105. for {
  106. resp, listenErr := stream.Recv()
  107. if listenErr == io.EOF {
  108. return nil
  109. }
  110. if listenErr != nil {
  111. return listenErr
  112. }
  113. if err := processEventFn(resp); err != nil {
  114. return fmt.Errorf("process %v: %v", resp, err)
  115. }
  116. lastTsNs = resp.TsNs
  117. }
  118. })
  119. if err != nil {
  120. glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
  121. time.Sleep(1733 * time.Millisecond)
  122. }
  123. }
  124. }
  125. func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool, err error) {
  126. err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  127. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  128. if err != nil {
  129. return err
  130. }
  131. isSame = f.Signature == resp.Signature
  132. return nil
  133. })
  134. return
  135. }
  136. func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err error) {
  137. value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer))
  138. if err != nil {
  139. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  140. }
  141. lastTsNs = int64(util.BytesToUint64(value))
  142. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  143. return
  144. }
  145. func (ma *MetaAggregator) updateOffset(f *Filer, peer string, lastTsNs int64) (err error) {
  146. value := make([]byte, 8)
  147. util.Uint64toBytes(value, uint64(lastTsNs))
  148. err = f.Store.KvPut(context.Background(), []byte("meta"+peer), value)
  149. if err != nil {
  150. return fmt.Errorf("updateOffset %s : %v", peer, err)
  151. }
  152. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  153. return
  154. }