You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
8.4 KiB

3 years ago
2 years ago
2 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "io"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "google.golang.org/protobuf/proto"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  18. )
  19. type MetaAggregator struct {
  20. filer *Filer
  21. self pb.ServerAddress
  22. isLeader bool
  23. grpcDialOption grpc.DialOption
  24. MetaLogBuffer *log_buffer.LogBuffer
  25. peerStatues map[pb.ServerAddress]int
  26. peerStatuesLock sync.Mutex
  27. // notifying clients
  28. ListenersLock sync.Mutex
  29. ListenersCond *sync.Cond
  30. }
  31. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  32. // The old data comes from what each LocalMetadata persisted on disk.
  33. func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
  34. t := &MetaAggregator{
  35. filer: filer,
  36. self: self,
  37. grpcDialOption: grpcDialOption,
  38. peerStatues: make(map[pb.ServerAddress]int),
  39. }
  40. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  41. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
  42. t.ListenersCond.Broadcast()
  43. })
  44. return t
  45. }
  46. func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  47. if update.NodeType != cluster.FilerType {
  48. return
  49. }
  50. address := pb.ServerAddress(update.Address)
  51. if update.IsAdd {
  52. // every filer should subscribe to a new filer
  53. if ma.setActive(address, true) {
  54. go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom)
  55. }
  56. } else {
  57. ma.setActive(address, false)
  58. }
  59. }
  60. func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
  61. ma.peerStatuesLock.Lock()
  62. defer ma.peerStatuesLock.Unlock()
  63. if isActive {
  64. if _, found := ma.peerStatues[address]; found {
  65. ma.peerStatues[address] += 1
  66. } else {
  67. ma.peerStatues[address] = 1
  68. notDuplicated = true
  69. }
  70. } else {
  71. if _, found := ma.peerStatues[address]; found {
  72. delete(ma.peerStatues, address)
  73. }
  74. }
  75. return
  76. }
  77. func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
  78. ma.peerStatuesLock.Lock()
  79. defer ma.peerStatuesLock.Unlock()
  80. var count int
  81. count, isActive = ma.peerStatues[address]
  82. return count > 0 && isActive
  83. }
  84. func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
  85. lastTsNs := startFrom.UnixNano()
  86. for {
  87. glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
  88. nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
  89. if !ma.isActive(peer) {
  90. glog.V(0).Infof("stop subscribing remote %s meta change", peer)
  91. return
  92. }
  93. if err != nil {
  94. errLvl := glog.Level(0)
  95. if strings.Contains(err.Error(), "duplicated local subscription detected") {
  96. errLvl = glog.Level(4)
  97. }
  98. glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
  99. }
  100. if lastTsNs < nextLastTsNs {
  101. lastTsNs = nextLastTsNs
  102. }
  103. time.Sleep(1733 * time.Millisecond)
  104. }
  105. }
  106. func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom int64) (int64, error) {
  107. /*
  108. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  109. When reading from other filers' local meta changes:
  110. * if the received change does not contain signature from self, apply the change to current filer store.
  111. Upon connecting to other filers, need to remember their signature and their offsets.
  112. */
  113. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  114. lastPersistTime := time.Now()
  115. lastTsNs := startFrom
  116. peerSignature, err := ma.readFilerStoreSignature(peer)
  117. if err != nil {
  118. return lastTsNs, fmt.Errorf("connecting to peer filer %s: %v", peer, err)
  119. }
  120. // when filer store is not shared by multiple filers
  121. if peerSignature != f.Signature {
  122. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  123. lastTsNs = prevTsNs
  124. defer func(prevTsNs int64) {
  125. if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
  126. if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
  127. glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  128. } else {
  129. glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  130. }
  131. }
  132. }(prevTsNs)
  133. }
  134. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  135. var counter int64
  136. var synced bool
  137. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  138. if err := Replay(f.Store, event); err != nil {
  139. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  140. return
  141. }
  142. counter++
  143. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  144. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  145. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  146. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  147. } else if !synced {
  148. synced = true
  149. glog.V(0).Infof("synced with %s", peer)
  150. }
  151. lastPersistTime = time.Now()
  152. counter = 0
  153. } else {
  154. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  155. }
  156. }
  157. }
  158. }
  159. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  160. data, err := proto.Marshal(event)
  161. if err != nil {
  162. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  163. return err
  164. }
  165. dir := event.Directory
  166. // println("received meta change", dir, "size", len(data))
  167. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  168. if maybeReplicateMetadataChange != nil {
  169. maybeReplicateMetadataChange(event)
  170. }
  171. return nil
  172. }
  173. glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
  174. err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  175. ctx, cancel := context.WithCancel(context.Background())
  176. defer cancel()
  177. ma.filer.UniqueFilerEpoch++
  178. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  179. ClientName: "filer:" + string(self),
  180. PathPrefix: "/",
  181. SinceNs: lastTsNs,
  182. ClientId: ma.filer.UniqueFilerId,
  183. ClientEpoch: ma.filer.UniqueFilerEpoch,
  184. })
  185. if err != nil {
  186. return fmt.Errorf("subscribe: %v", err)
  187. }
  188. for {
  189. resp, listenErr := stream.Recv()
  190. if listenErr == io.EOF {
  191. return nil
  192. }
  193. if listenErr != nil {
  194. return listenErr
  195. }
  196. if err := processEventFn(resp); err != nil {
  197. return fmt.Errorf("process %v: %v", resp, err)
  198. }
  199. f.onMetadataChangeEvent(resp)
  200. lastTsNs = resp.TsNs
  201. }
  202. })
  203. return lastTsNs, err
  204. }
  205. func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
  206. err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  207. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  208. if err != nil {
  209. return err
  210. }
  211. sig = resp.Signature
  212. return nil
  213. })
  214. return
  215. }
  216. const (
  217. MetaOffsetPrefix = "Meta"
  218. )
  219. func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
  220. key := []byte(MetaOffsetPrefix + "xxxx")
  221. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  222. value, err := f.Store.KvGet(context.Background(), key)
  223. if err != nil {
  224. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  225. }
  226. lastTsNs = int64(util.BytesToUint64(value))
  227. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  228. return
  229. }
  230. func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
  231. key := []byte(MetaOffsetPrefix + "xxxx")
  232. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  233. value := make([]byte, 8)
  234. util.Uint64toBytes(value, uint64(lastTsNs))
  235. err = f.Store.KvPut(context.Background(), key, value)
  236. if err != nil {
  237. return fmt.Errorf("updateOffset %s : %v", peer, err)
  238. }
  239. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  240. return
  241. }