You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "io"
  8. "sync"
  9. "time"
  10. "github.com/golang/protobuf/proto"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  16. )
  17. type MetaAggregator struct {
  18. filer *Filer
  19. self pb.ServerAddress
  20. isLeader bool
  21. grpcDialOption grpc.DialOption
  22. MetaLogBuffer *log_buffer.LogBuffer
  23. peerStatues map[pb.ServerAddress]struct{}
  24. peerStatuesLock sync.Mutex
  25. // notifying clients
  26. ListenersLock sync.Mutex
  27. ListenersCond *sync.Cond
  28. }
  29. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  30. // The old data comes from what each LocalMetadata persisted on disk.
  31. func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
  32. t := &MetaAggregator{
  33. filer: filer,
  34. self: self,
  35. grpcDialOption: grpcDialOption,
  36. peerStatues: make(map[pb.ServerAddress]struct{}),
  37. }
  38. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  39. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
  40. t.ListenersCond.Broadcast()
  41. })
  42. return t
  43. }
  44. func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
  45. if update.NodeType != "filer" {
  46. return
  47. }
  48. address := pb.ServerAddress(update.Address)
  49. if update.IsAdd {
  50. // every filer should subscribe to a new filer
  51. ma.setActive(address, true)
  52. go ma.subscribeToOneFiler(ma.filer, ma.self, address)
  53. } else {
  54. ma.setActive(address, false)
  55. }
  56. }
  57. func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
  58. ma.peerStatuesLock.Lock()
  59. defer ma.peerStatuesLock.Unlock()
  60. if isActive {
  61. ma.peerStatues[address] = struct{}{}
  62. } else {
  63. delete(ma.peerStatues, address)
  64. }
  65. }
  66. func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) {
  67. ma.peerStatuesLock.Lock()
  68. defer ma.peerStatuesLock.Unlock()
  69. _, isActive = ma.peerStatues[address]
  70. return
  71. }
  72. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
  73. /*
  74. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  75. When reading from other filers' local meta changes:
  76. * if the received change does not contain signature from self, apply the change to current filer store.
  77. Upon connecting to other filers, need to remember their signature and their offsets.
  78. */
  79. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  80. lastPersistTime := time.Now()
  81. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  82. peerSignature, err := ma.readFilerStoreSignature(peer)
  83. for err != nil {
  84. glog.V(0).Infof("connecting to peer filer %s: %v", peer, err)
  85. time.Sleep(1357 * time.Millisecond)
  86. peerSignature, err = ma.readFilerStoreSignature(peer)
  87. }
  88. // when filer store is not shared by multiple filers
  89. if peerSignature != f.Signature {
  90. lastTsNs = 0
  91. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  92. lastTsNs = prevTsNs
  93. }
  94. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  95. var counter int64
  96. var synced bool
  97. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  98. if err := Replay(f.Store, event); err != nil {
  99. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  100. return
  101. }
  102. counter++
  103. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  104. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  105. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  106. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  107. } else if !synced {
  108. synced = true
  109. glog.V(0).Infof("synced with %s", peer)
  110. }
  111. lastPersistTime = time.Now()
  112. counter = 0
  113. } else {
  114. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  115. }
  116. }
  117. }
  118. }
  119. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  120. data, err := proto.Marshal(event)
  121. if err != nil {
  122. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  123. return err
  124. }
  125. dir := event.Directory
  126. // println("received meta change", dir, "size", len(data))
  127. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  128. if maybeReplicateMetadataChange != nil {
  129. maybeReplicateMetadataChange(event)
  130. }
  131. return nil
  132. }
  133. for {
  134. glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
  135. err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  136. ctx, cancel := context.WithCancel(context.Background())
  137. defer cancel()
  138. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  139. ClientName: "filer:" + string(self),
  140. PathPrefix: "/",
  141. SinceNs: lastTsNs,
  142. })
  143. if err != nil {
  144. return fmt.Errorf("subscribe: %v", err)
  145. }
  146. for {
  147. resp, listenErr := stream.Recv()
  148. if listenErr == io.EOF {
  149. return nil
  150. }
  151. if listenErr != nil {
  152. return listenErr
  153. }
  154. if err := processEventFn(resp); err != nil {
  155. return fmt.Errorf("process %v: %v", resp, err)
  156. }
  157. lastTsNs = resp.TsNs
  158. f.onMetadataChangeEvent(resp)
  159. }
  160. })
  161. if !ma.isActive(peer) {
  162. glog.V(0).Infof("stop subscribing remote %s meta change", peer)
  163. return
  164. }
  165. if err != nil {
  166. glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
  167. time.Sleep(1733 * time.Millisecond)
  168. }
  169. }
  170. }
  171. func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
  172. err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  173. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  174. if err != nil {
  175. return err
  176. }
  177. sig = resp.Signature
  178. return nil
  179. })
  180. return
  181. }
  182. const (
  183. MetaOffsetPrefix = "Meta"
  184. )
  185. func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
  186. key := []byte(MetaOffsetPrefix + "xxxx")
  187. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  188. value, err := f.Store.KvGet(context.Background(), key)
  189. if err == ErrKvNotFound {
  190. glog.Warningf("readOffset %s not found", peer)
  191. return 0, nil
  192. }
  193. if err != nil {
  194. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  195. }
  196. lastTsNs = int64(util.BytesToUint64(value))
  197. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  198. return
  199. }
  200. func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
  201. key := []byte(MetaOffsetPrefix + "xxxx")
  202. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  203. value := make([]byte, 8)
  204. util.Uint64toBytes(value, uint64(lastTsNs))
  205. err = f.Store.KvPut(context.Background(), key, value)
  206. if err != nil {
  207. return fmt.Errorf("updateOffset %s : %v", peer, err)
  208. }
  209. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  210. return
  211. }