351 lines
11 KiB

5 years ago
5 years ago
5 years ago
7 months ago
7 months ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
7 months ago
4 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/stats"
  8. "google.golang.org/protobuf/proto"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  14. )
  15. const (
  16. // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
  17. MaxUnsyncedEvents = 1e3
  18. )
  19. func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
  20. peerAddress := findClientAddress(stream.Context(), 0)
  21. isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
  22. if isReplacing {
  23. fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
  24. } else if alreadyKnown {
  25. fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
  26. return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
  27. }
  28. defer func() {
  29. glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
  30. fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
  31. fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
  32. }()
  33. lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
  34. glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  35. eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
  36. eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
  37. var processedTsNs int64
  38. var readPersistedLogErr error
  39. var readInMemoryLogErr error
  40. var isDone bool
  41. for {
  42. glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  43. processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
  44. if readPersistedLogErr != nil {
  45. return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
  46. }
  47. if isDone {
  48. return nil
  49. }
  50. glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs)
  51. if processedTsNs != 0 {
  52. lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
  53. } else {
  54. nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
  55. position := log_buffer.NewMessagePosition(nextDayTs, -2)
  56. found, err := fs.filer.HasPersistedLogFiles(position)
  57. if err != nil {
  58. return fmt.Errorf("checking persisted log files: %v", err)
  59. }
  60. if found {
  61. lastReadTime = position
  62. }
  63. }
  64. glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  65. lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
  66. fs.filer.MetaAggregator.ListenersLock.Lock()
  67. fs.filer.MetaAggregator.ListenersCond.Wait()
  68. fs.filer.MetaAggregator.ListenersLock.Unlock()
  69. return fs.hasClient(req.ClientId, req.ClientEpoch)
  70. }, eachLogEntryFn)
  71. if readInMemoryLogErr != nil {
  72. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  73. continue
  74. }
  75. glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
  76. if readInMemoryLogErr != log_buffer.ResumeError {
  77. break
  78. }
  79. }
  80. if isDone {
  81. return nil
  82. }
  83. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  84. glog.V(0).Infof("client %v is closed", clientName)
  85. return nil
  86. }
  87. time.Sleep(1127 * time.Millisecond)
  88. }
  89. return readInMemoryLogErr
  90. }
  91. func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
  92. peerAddress := findClientAddress(stream.Context(), 0)
  93. // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
  94. req.ClientId = -req.ClientId
  95. isReplacing, alreadyKnown, clientName := fs.addClient("local", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
  96. if isReplacing {
  97. fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
  98. } else if alreadyKnown {
  99. return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
  100. }
  101. defer func() {
  102. glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
  103. fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
  104. fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
  105. }()
  106. lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
  107. glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
  108. eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
  109. eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
  110. var processedTsNs int64
  111. var readPersistedLogErr error
  112. var readInMemoryLogErr error
  113. var isDone bool
  114. for {
  115. // println("reading from persisted logs ...")
  116. glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  117. processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
  118. if readPersistedLogErr != nil {
  119. glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
  120. return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
  121. }
  122. if isDone {
  123. return nil
  124. }
  125. if processedTsNs != 0 {
  126. lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
  127. } else {
  128. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  129. time.Sleep(1127 * time.Millisecond)
  130. continue
  131. }
  132. }
  133. glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  134. lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
  135. fs.listenersLock.Lock()
  136. atomic.AddInt64(&fs.listenersWaits, 1)
  137. fs.listenersCond.Wait()
  138. atomic.AddInt64(&fs.listenersWaits, -1)
  139. fs.listenersLock.Unlock()
  140. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  141. return false
  142. }
  143. return true
  144. }, eachLogEntryFn)
  145. if readInMemoryLogErr != nil {
  146. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  147. continue
  148. }
  149. glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
  150. if readInMemoryLogErr != log_buffer.ResumeError {
  151. break
  152. }
  153. }
  154. if isDone {
  155. return nil
  156. }
  157. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  158. return nil
  159. }
  160. }
  161. return readInMemoryLogErr
  162. }
  163. func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
  164. return func(logEntry *filer_pb.LogEntry) (bool, error) {
  165. event := &filer_pb.SubscribeMetadataResponse{}
  166. if err := proto.Unmarshal(logEntry.Data, event); err != nil {
  167. glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
  168. return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
  169. }
  170. if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
  171. return false, err
  172. }
  173. return false, nil
  174. }
  175. }
  176. func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
  177. filtered := 0
  178. return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
  179. defer func() {
  180. if filtered > MaxUnsyncedEvents {
  181. if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
  182. EventNotification: &filer_pb.EventNotification{},
  183. TsNs: tsNs,
  184. }); err == nil {
  185. filtered = 0
  186. }
  187. }
  188. }()
  189. filtered++
  190. foundSelf := false
  191. for _, sig := range eventNotification.Signatures {
  192. if sig == req.Signature && req.Signature != 0 {
  193. return nil
  194. }
  195. if sig == fs.filer.Signature {
  196. foundSelf = true
  197. }
  198. }
  199. if !foundSelf {
  200. eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
  201. }
  202. // get complete path to the file or directory
  203. var entryName string
  204. if eventNotification.OldEntry != nil {
  205. entryName = eventNotification.OldEntry.Name
  206. } else if eventNotification.NewEntry != nil {
  207. entryName = eventNotification.NewEntry.Name
  208. }
  209. fullpath := util.Join(dirPath, entryName)
  210. // skip on filer internal meta logs
  211. if strings.HasPrefix(fullpath, filer.SystemLogDir) {
  212. return nil
  213. }
  214. if hasPrefixIn(fullpath, req.PathPrefixes) {
  215. // good
  216. } else if matchByDirectory(dirPath, req.Directories) {
  217. // good
  218. } else {
  219. if !strings.HasPrefix(fullpath, req.PathPrefix) {
  220. if eventNotification.NewParentPath != "" {
  221. newFullPath := util.Join(eventNotification.NewParentPath, entryName)
  222. if !strings.HasPrefix(newFullPath, req.PathPrefix) {
  223. return nil
  224. }
  225. } else {
  226. return nil
  227. }
  228. }
  229. }
  230. // collect timestamps for path
  231. stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs))
  232. message := &filer_pb.SubscribeMetadataResponse{
  233. Directory: dirPath,
  234. EventNotification: eventNotification,
  235. TsNs: tsNs,
  236. }
  237. // println("sending", dirPath, entryName)
  238. if err := stream.Send(message); err != nil {
  239. glog.V(0).Infof("=> client %v: %+v", clientName, err)
  240. return err
  241. }
  242. filtered = 0
  243. return nil
  244. }
  245. }
  246. func hasPrefixIn(text string, prefixes []string) bool {
  247. for _, p := range prefixes {
  248. if strings.HasPrefix(text, p) {
  249. return true
  250. }
  251. }
  252. return false
  253. }
  254. func matchByDirectory(dirPath string, directories []string) bool {
  255. for _, dir := range directories {
  256. if dirPath == dir {
  257. return true
  258. }
  259. }
  260. return false
  261. }
  262. func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) {
  263. clientName = clientType + "@" + clientAddress
  264. glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
  265. if clientId != 0 {
  266. fs.knownListenersLock.Lock()
  267. defer fs.knownListenersLock.Unlock()
  268. epoch, found := fs.knownListeners[clientId]
  269. if !found || epoch < clientEpoch {
  270. fs.knownListeners[clientId] = clientEpoch
  271. isReplacing = true
  272. } else {
  273. alreadyKnown = true
  274. }
  275. }
  276. return
  277. }
  278. func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) {
  279. glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
  280. if clientId != 0 {
  281. fs.knownListenersLock.Lock()
  282. defer fs.knownListenersLock.Unlock()
  283. epoch, found := fs.knownListeners[clientId]
  284. if found && epoch <= clientEpoch {
  285. delete(fs.knownListeners, clientId)
  286. }
  287. }
  288. }
  289. func (fs *FilerServer) hasClient(clientId int32, clientEpoch int32) bool {
  290. if clientId != 0 {
  291. fs.knownListenersLock.Lock()
  292. defer fs.knownListenersLock.Unlock()
  293. epoch, found := fs.knownListeners[clientId]
  294. if found && epoch <= clientEpoch {
  295. return true
  296. }
  297. }
  298. return false
  299. }