You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
11 KiB

5 years ago
5 years ago
5 years ago
8 months ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
8 months ago
4 years ago
3 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/stats"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "google.golang.org/protobuf/proto"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  14. )
  15. const (
  16. // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
  17. MaxUnsyncedEvents = 1e3
  18. )
  19. func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
  20. peerAddress := findClientAddress(stream.Context(), 0)
  21. isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
  22. if isReplacing {
  23. fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
  24. } else if alreadyKnown {
  25. return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
  26. }
  27. defer func() {
  28. glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
  29. fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
  30. fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
  31. }()
  32. lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
  33. glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  34. eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
  35. eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
  36. var processedTsNs int64
  37. var readPersistedLogErr error
  38. var readInMemoryLogErr error
  39. var isDone bool
  40. for {
  41. glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  42. processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
  43. if readPersistedLogErr != nil {
  44. return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
  45. }
  46. if isDone {
  47. return nil
  48. }
  49. if processedTsNs != 0 {
  50. lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
  51. }
  52. glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  53. lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
  54. fs.filer.MetaAggregator.ListenersLock.Lock()
  55. fs.filer.MetaAggregator.ListenersCond.Wait()
  56. fs.filer.MetaAggregator.ListenersLock.Unlock()
  57. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  58. return false
  59. }
  60. return true
  61. }, eachLogEntryFn)
  62. if readInMemoryLogErr != nil {
  63. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  64. continue
  65. }
  66. glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
  67. if readInMemoryLogErr != log_buffer.ResumeError {
  68. break
  69. }
  70. }
  71. if isDone {
  72. return nil
  73. }
  74. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  75. glog.V(0).Infof("client %v is closed", clientName)
  76. return nil
  77. }
  78. time.Sleep(1127 * time.Millisecond)
  79. }
  80. return readInMemoryLogErr
  81. }
  82. func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
  83. peerAddress := findClientAddress(stream.Context(), 0)
  84. // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
  85. req.ClientId = -req.ClientId
  86. isReplacing, alreadyKnown, clientName := fs.addClient("local", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
  87. if isReplacing {
  88. fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
  89. } else if alreadyKnown {
  90. return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
  91. }
  92. defer func() {
  93. glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
  94. fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
  95. fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
  96. }()
  97. lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
  98. glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
  99. eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
  100. eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
  101. var processedTsNs int64
  102. var readPersistedLogErr error
  103. var readInMemoryLogErr error
  104. var isDone bool
  105. for {
  106. // println("reading from persisted logs ...")
  107. glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  108. processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
  109. if readPersistedLogErr != nil {
  110. glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
  111. return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
  112. }
  113. if isDone {
  114. return nil
  115. }
  116. if processedTsNs != 0 {
  117. lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
  118. } else {
  119. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  120. time.Sleep(1127 * time.Millisecond)
  121. continue
  122. }
  123. }
  124. glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
  125. lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
  126. fs.listenersLock.Lock()
  127. atomic.AddInt64(&fs.listenersWaits, 1)
  128. fs.listenersCond.Wait()
  129. atomic.AddInt64(&fs.listenersWaits, -1)
  130. fs.listenersLock.Unlock()
  131. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  132. return false
  133. }
  134. return true
  135. }, eachLogEntryFn)
  136. if readInMemoryLogErr != nil {
  137. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  138. continue
  139. }
  140. glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
  141. if readInMemoryLogErr != log_buffer.ResumeError {
  142. break
  143. }
  144. }
  145. if isDone {
  146. return nil
  147. }
  148. if !fs.hasClient(req.ClientId, req.ClientEpoch) {
  149. return nil
  150. }
  151. }
  152. return readInMemoryLogErr
  153. }
  154. func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
  155. return func(logEntry *filer_pb.LogEntry) (bool, error) {
  156. event := &filer_pb.SubscribeMetadataResponse{}
  157. if err := proto.Unmarshal(logEntry.Data, event); err != nil {
  158. glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
  159. return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
  160. }
  161. if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
  162. return false, err
  163. }
  164. return false, nil
  165. }
  166. }
  167. func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
  168. filtered := 0
  169. return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
  170. defer func() {
  171. if filtered > MaxUnsyncedEvents {
  172. if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
  173. EventNotification: &filer_pb.EventNotification{},
  174. TsNs: tsNs,
  175. }); err == nil {
  176. filtered = 0
  177. }
  178. }
  179. }()
  180. filtered++
  181. foundSelf := false
  182. for _, sig := range eventNotification.Signatures {
  183. if sig == req.Signature && req.Signature != 0 {
  184. return nil
  185. }
  186. if sig == fs.filer.Signature {
  187. foundSelf = true
  188. }
  189. }
  190. if !foundSelf {
  191. eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
  192. }
  193. // get complete path to the file or directory
  194. var entryName string
  195. if eventNotification.OldEntry != nil {
  196. entryName = eventNotification.OldEntry.Name
  197. } else if eventNotification.NewEntry != nil {
  198. entryName = eventNotification.NewEntry.Name
  199. }
  200. fullpath := util.Join(dirPath, entryName)
  201. // skip on filer internal meta logs
  202. if strings.HasPrefix(fullpath, filer.SystemLogDir) {
  203. return nil
  204. }
  205. if hasPrefixIn(fullpath, req.PathPrefixes) {
  206. // good
  207. } else if matchByDirectory(dirPath, req.Directories) {
  208. // good
  209. } else {
  210. if !strings.HasPrefix(fullpath, req.PathPrefix) {
  211. if eventNotification.NewParentPath != "" {
  212. newFullPath := util.Join(eventNotification.NewParentPath, entryName)
  213. if !strings.HasPrefix(newFullPath, req.PathPrefix) {
  214. return nil
  215. }
  216. } else {
  217. return nil
  218. }
  219. }
  220. }
  221. // collect timestamps for path
  222. stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs))
  223. message := &filer_pb.SubscribeMetadataResponse{
  224. Directory: dirPath,
  225. EventNotification: eventNotification,
  226. TsNs: tsNs,
  227. }
  228. // println("sending", dirPath, entryName)
  229. if err := stream.Send(message); err != nil {
  230. glog.V(0).Infof("=> client %v: %+v", clientName, err)
  231. return err
  232. }
  233. filtered = 0
  234. return nil
  235. }
  236. }
  237. func hasPrefixIn(text string, prefixes []string) bool {
  238. for _, p := range prefixes {
  239. if strings.HasPrefix(text, p) {
  240. return true
  241. }
  242. }
  243. return false
  244. }
  245. func matchByDirectory(dirPath string, directories []string) bool {
  246. for _, dir := range directories {
  247. if dirPath == dir {
  248. return true
  249. }
  250. }
  251. return false
  252. }
  253. func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) {
  254. clientName = clientType + "@" + clientAddress
  255. glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
  256. if clientId != 0 {
  257. fs.knownListenersLock.Lock()
  258. defer fs.knownListenersLock.Unlock()
  259. epoch, found := fs.knownListeners[clientId]
  260. if !found || epoch < clientEpoch {
  261. fs.knownListeners[clientId] = clientEpoch
  262. isReplacing = true
  263. } else {
  264. alreadyKnown = true
  265. }
  266. }
  267. return
  268. }
  269. func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) {
  270. glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
  271. if clientId != 0 {
  272. fs.knownListenersLock.Lock()
  273. defer fs.knownListenersLock.Unlock()
  274. epoch, found := fs.knownListeners[clientId]
  275. if found && epoch <= clientEpoch {
  276. delete(fs.knownListeners, clientId)
  277. }
  278. }
  279. }
  280. func (fs *FilerServer) hasClient(clientId int32, clientEpoch int32) bool {
  281. if clientId != 0 {
  282. fs.knownListenersLock.Lock()
  283. defer fs.knownListenersLock.Unlock()
  284. epoch, found := fs.knownListeners[clientId]
  285. if found && epoch <= clientEpoch {
  286. return true
  287. }
  288. }
  289. return false
  290. }