You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

277 lines
8.1 KiB

3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/cluster"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "io"
  9. "sync"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  17. )
  18. type MetaAggregator struct {
  19. filer *Filer
  20. self pb.ServerAddress
  21. isLeader bool
  22. grpcDialOption grpc.DialOption
  23. MetaLogBuffer *log_buffer.LogBuffer
  24. peerStatues map[pb.ServerAddress]int
  25. peerStatuesLock sync.Mutex
  26. // notifying clients
  27. ListenersLock sync.Mutex
  28. ListenersCond *sync.Cond
  29. }
  30. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  31. // The old data comes from what each LocalMetadata persisted on disk.
  32. func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
  33. t := &MetaAggregator{
  34. filer: filer,
  35. self: self,
  36. grpcDialOption: grpcDialOption,
  37. peerStatues: make(map[pb.ServerAddress]int),
  38. }
  39. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  40. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
  41. t.ListenersCond.Broadcast()
  42. })
  43. return t
  44. }
  45. func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
  46. if update.NodeType != cluster.FilerType {
  47. return
  48. }
  49. address := pb.ServerAddress(update.Address)
  50. if update.IsAdd {
  51. // every filer should subscribe to a new filer
  52. if ma.setActive(address, true) {
  53. go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address)
  54. }
  55. } else {
  56. ma.setActive(address, false)
  57. }
  58. }
  59. func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
  60. ma.peerStatuesLock.Lock()
  61. defer ma.peerStatuesLock.Unlock()
  62. if isActive {
  63. if _, found := ma.peerStatues[address]; found {
  64. ma.peerStatues[address] += 1
  65. } else {
  66. ma.peerStatues[address] = 1
  67. notDuplicated = true
  68. }
  69. } else {
  70. if _, found := ma.peerStatues[address]; found {
  71. delete(ma.peerStatues, address)
  72. }
  73. }
  74. return
  75. }
  76. func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
  77. ma.peerStatuesLock.Lock()
  78. defer ma.peerStatuesLock.Unlock()
  79. var count int
  80. count, isActive = ma.peerStatues[address]
  81. return count > 0 && isActive
  82. }
  83. func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
  84. for {
  85. err := ma.doSubscribeToOneFiler(f, self, peer)
  86. if !ma.isActive(peer) {
  87. glog.V(0).Infof("stop subscribing remote %s meta change", peer)
  88. return
  89. }
  90. if err != nil {
  91. glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
  92. }
  93. time.Sleep(1733 * time.Millisecond)
  94. }
  95. }
  96. func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) error {
  97. /*
  98. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  99. When reading from other filers' local meta changes:
  100. * if the received change does not contain signature from self, apply the change to current filer store.
  101. Upon connecting to other filers, need to remember their signature and their offsets.
  102. */
  103. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  104. lastPersistTime := time.Now()
  105. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  106. peerSignature, err := ma.readFilerStoreSignature(peer)
  107. for err != nil {
  108. glog.V(0).Infof("connecting to peer filer %s: %v", peer, err)
  109. time.Sleep(1357 * time.Millisecond)
  110. peerSignature, err = ma.readFilerStoreSignature(peer)
  111. }
  112. // when filer store is not shared by multiple filers
  113. if peerSignature != f.Signature {
  114. lastTsNs = 0
  115. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  116. lastTsNs = prevTsNs
  117. defer func(prevTsNs int64) {
  118. if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
  119. if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
  120. glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  121. } else {
  122. glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  123. }
  124. }
  125. }(prevTsNs)
  126. }
  127. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  128. var counter int64
  129. var synced bool
  130. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  131. if err := Replay(f.Store, event); err != nil {
  132. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  133. return
  134. }
  135. counter++
  136. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  137. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  138. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  139. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  140. } else if !synced {
  141. synced = true
  142. glog.V(0).Infof("synced with %s", peer)
  143. }
  144. lastPersistTime = time.Now()
  145. counter = 0
  146. } else {
  147. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  148. }
  149. }
  150. }
  151. }
  152. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  153. data, err := proto.Marshal(event)
  154. if err != nil {
  155. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  156. return err
  157. }
  158. dir := event.Directory
  159. // println("received meta change", dir, "size", len(data))
  160. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  161. if maybeReplicateMetadataChange != nil {
  162. maybeReplicateMetadataChange(event)
  163. }
  164. return nil
  165. }
  166. glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
  167. err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  168. ctx, cancel := context.WithCancel(context.Background())
  169. defer cancel()
  170. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  171. ClientName: "filer:" + string(self),
  172. PathPrefix: "/",
  173. SinceNs: lastTsNs,
  174. ClientId: int32(ma.filer.UniqueFileId),
  175. })
  176. if err != nil {
  177. return fmt.Errorf("subscribe: %v", err)
  178. }
  179. for {
  180. resp, listenErr := stream.Recv()
  181. if listenErr == io.EOF {
  182. return nil
  183. }
  184. if listenErr != nil {
  185. return listenErr
  186. }
  187. if err := processEventFn(resp); err != nil {
  188. return fmt.Errorf("process %v: %v", resp, err)
  189. }
  190. lastTsNs = resp.TsNs
  191. f.onMetadataChangeEvent(resp)
  192. }
  193. })
  194. return err
  195. }
  196. func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
  197. err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  198. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  199. if err != nil {
  200. return err
  201. }
  202. sig = resp.Signature
  203. return nil
  204. })
  205. return
  206. }
  207. const (
  208. MetaOffsetPrefix = "Meta"
  209. )
  210. func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
  211. key := []byte(MetaOffsetPrefix + "xxxx")
  212. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  213. value, err := f.Store.KvGet(context.Background(), key)
  214. if err == ErrKvNotFound {
  215. glog.Warningf("readOffset %s not found", peer)
  216. return 0, nil
  217. }
  218. if err != nil {
  219. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  220. }
  221. lastTsNs = int64(util.BytesToUint64(value))
  222. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  223. return
  224. }
  225. func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
  226. key := []byte(MetaOffsetPrefix + "xxxx")
  227. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  228. value := make([]byte, 8)
  229. util.Uint64toBytes(value, uint64(lastTsNs))
  230. err = f.Store.KvPut(context.Background(), key, value)
  231. if err != nil {
  232. return fmt.Errorf("updateOffset %s : %v", peer, err)
  233. }
  234. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  235. return
  236. }