You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.9 KiB

8 years ago
  1. package polling
  2. import (
  3. "runtime/debug"
  4. "sync"
  5. "time"
  6. "github.com/matrix-org/go-neb/clients"
  7. "github.com/matrix-org/go-neb/database"
  8. "github.com/matrix-org/go-neb/types"
  9. log "github.com/sirupsen/logrus"
  10. )
  11. // Remember when we first started polling on this service ID. Polling routines will
  12. // continually check this time. If the service gets updated, this will change, prompting
  13. // older instances to die away. If this service gets removed, the time will be 0.
  14. var (
  15. pollMutex sync.Mutex
  16. startPollTime = make(map[string]int64) // ServiceID => unix timestamp
  17. )
  18. var clientPool *clients.Clients
  19. // SetClients sets a pool of clients for passing into OnPoll
  20. func SetClients(clis *clients.Clients) {
  21. clientPool = clis
  22. }
  23. // Start polling already existing services
  24. func Start() error {
  25. // Work out which service types require polling
  26. for _, serviceType := range types.PollingServiceTypes() {
  27. // Query for all services with said service type
  28. srvs, err := database.GetServiceDB().LoadServicesByType(serviceType)
  29. if err != nil {
  30. return err
  31. }
  32. for _, s := range srvs {
  33. if err := StartPolling(s); err != nil {
  34. return err
  35. }
  36. }
  37. }
  38. return nil
  39. }
  40. // StartPolling begins a polling loop for this service.
  41. // If one already exists for this service, it will be instructed to die. The new poll will not wait for this to happen,
  42. // so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate
  43. // this poll.
  44. func StartPolling(service types.Service) error {
  45. // Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here,
  46. // we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended.
  47. ts := time.Now().UnixNano()
  48. setPollStartTime(service, ts)
  49. go pollLoop(service, ts)
  50. return nil
  51. }
  52. // StopPolling stops all pollers for this service.
  53. func StopPolling(service types.Service) {
  54. log.WithFields(log.Fields{
  55. "service_id": service.ServiceID(),
  56. "service_type": service.ServiceType(),
  57. }).Info("StopPolling")
  58. setPollStartTime(service, 0)
  59. }
  60. // pollLoop begins the polling loop for this service. Does not return, so call this
  61. // as a goroutine!
  62. func pollLoop(service types.Service, ts int64) {
  63. logger := log.WithFields(log.Fields{
  64. "timestamp": ts,
  65. "service_id": service.ServiceID(),
  66. "service_type": service.ServiceType(),
  67. })
  68. defer func() {
  69. // Kill the poll loop entirely as it is likely that whatever made us panic will
  70. // make us panic again. We can whine bitterly about it though.
  71. if r := recover(); r != nil {
  72. logger.WithField("panic", r).Errorf(
  73. "pollLoop panicked!\n%s", debug.Stack(),
  74. )
  75. }
  76. }()
  77. poller, ok := service.(types.Poller)
  78. if !ok {
  79. logger.Error("Service is not a Poller.")
  80. return
  81. }
  82. logger.Info("Starting polling loop")
  83. cli, err := clientPool.Client(service.ServiceUserID())
  84. if err != nil {
  85. logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client")
  86. return
  87. }
  88. for {
  89. logger.Info("OnPoll")
  90. nextTime := poller.OnPoll(cli)
  91. if pollTimeChanged(service, ts) {
  92. logger.Info("Terminating poll.")
  93. break
  94. }
  95. // work out how long to sleep
  96. if nextTime.Unix() == 0 {
  97. logger.Info("Terminating poll - OnPoll returned 0")
  98. break
  99. }
  100. now := time.Now()
  101. time.Sleep(nextTime.Sub(now))
  102. if pollTimeChanged(service, ts) {
  103. logger.Info("Terminating poll.")
  104. break
  105. }
  106. }
  107. }
  108. // setPollStartTime clobbers the current poll time
  109. func setPollStartTime(service types.Service, startTS int64) {
  110. pollMutex.Lock()
  111. defer pollMutex.Unlock()
  112. startPollTime[service.ServiceID()] = startTS
  113. }
  114. // pollTimeChanged returns true if the poll start time for this service ID is different to the one supplied.
  115. func pollTimeChanged(service types.Service, ts int64) bool {
  116. pollMutex.Lock()
  117. defer pollMutex.Unlock()
  118. return startPollTime[service.ServiceID()] != ts
  119. }