diff --git a/src/github.com/matrix-org/go-neb/api.go b/src/github.com/matrix-org/go-neb/api.go index 9eec530..efb3e10 100644 --- a/src/github.com/matrix-org/go-neb/api.go +++ b/src/github.com/matrix-org/go-neb/api.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/errors" + "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" "net/http" "strings" @@ -306,6 +307,17 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac return nil, &errors.HTTPError{err, "Error storing service", 500} } + // Start any polling NOW because they may decide to stop it in PostRegister, and we want to make + // sure we'll actually stop. + if service.Poller() != nil { + if err := polling.StartPolling(service); err != nil { + log.WithFields(log.Fields{ + "service_id": service.ServiceID(), + log.ErrorKey: err, + }).Error("Failed to start poll loop.") + } + } + service.PostRegister(old) return &struct { diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index 6d69a22..7f0c49e 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -1,17 +1,24 @@ package polling import ( + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/types" + "sync" "time" ) -var shouldPoll = make(map[string]bool) // Service ID => yay/nay +// Remember when we first started polling on this service ID. Polling routines will +// continually check this time. If the service gets updated, this will change, prompting +// older instances to die away. If this service gets removed, the time will be 0. +var ( + pollMutex sync.Mutex + startPollTime = make(map[string]int64) // ServiceID => unix timestamp +) // Start polling already existing services func Start() error { - log.Print("Start polling") // Work out which service types require polling for serviceType, poller := range types.PollersByType() { if poller == nil { @@ -23,22 +30,74 @@ func Start() error { return err } for _, s := range srvs { - shouldPoll[s.ServiceID()] = true - go StartPolling(s, poller) + if err := StartPolling(s); err != nil { + return err + } } } return nil } -// StartPolling begins the polling loop for this service. Does not return, so call this +// StartPolling begins a polling loop for this service. +// If one already exists for this service, it will be instructed to die. The new poll will not wait for this to happen, +// so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate +// this poll. +func StartPolling(service types.Service) error { + p := types.PollersByType()[service.ServiceType()] + if p == nil { + return fmt.Errorf("Service %s (type=%s) doesn't have a Poller", service.ServiceID(), service.ServiceType()) + } + // Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here, + // we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended. + ts := time.Now().UnixNano() + setPollStartTime(service, ts) + go pollLoop(service, p, ts) + return nil +} + +// StopPolling stops all pollers for this service. +func StopPolling(service types.Service) { + log.WithFields(log.Fields{ + "service_id": service.ServiceID(), + "service_type": service.ServiceType(), + }).Info("StopPolling") + setPollStartTime(service, 0) +} + +// pollLoop begins the polling loop for this service. Does not return, so call this // as a goroutine! -func StartPolling(service types.Service, poller types.Poller) { +func pollLoop(service types.Service, poller types.Poller, ts int64) { + logger := log.WithFields(log.Fields{ + "timestamp": ts, + "service_id": service.ServiceID(), + "service_type": service.ServiceType(), + "interval_secs": poller.IntervalSecs(), + }) + logger.Info("Starting polling loop") for { - if !shouldPoll[service.ServiceID()] { - log.WithField("service_id", service.ServiceID()).Info("Terminating poll.") + poller.OnPoll(service) + if pollTimeChanged(service, ts) { + logger.Info("Terminating poll.") break } - poller.OnPoll(service) time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second) + if pollTimeChanged(service, ts) { + logger.Info("Terminating poll.") + break + } } } + +// setPollStartTime clobbers the current poll time +func setPollStartTime(service types.Service, startTs int64) { + pollMutex.Lock() + defer pollMutex.Unlock() + startPollTime[service.ServiceID()] = startTs +} + +// pollTimeChanged returns true if the poll start time for this service ID is different to the one supplied. +func pollTimeChanged(service types.Service, ts int64) bool { + pollMutex.Lock() + defer pollMutex.Unlock() + return startPollTime[service.ServiceID()] != ts +} diff --git a/src/github.com/matrix-org/go-neb/services/rss/rss.go b/src/github.com/matrix-org/go-neb/services/rss/rss.go index 47c5fa4..96f0736 100644 --- a/src/github.com/matrix-org/go-neb/services/rss/rss.go +++ b/src/github.com/matrix-org/go-neb/services/rss/rss.go @@ -3,6 +3,7 @@ package services import ( "errors" log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" @@ -53,26 +54,50 @@ func (s *rssService) Poller() types.Poller { return &rssPoller{} } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { - urlSet := make(map[string]bool) - for _, roomInfo := range s.Rooms { - for u, feedInfo := range roomInfo.Feeds { - if feedInfo.PollIntervalMins == 0 { - feedInfo.PollIntervalMins = 1 - log.Print("Set poll interval to 1 ", u) - } - urlSet[u] = true + feeds := feedUrls(s) + if len(feeds) == 0 { + // this is an error UNLESS the old service had some feeds in which case they are deleting us :( + oldFeeds := feedUrls(oldService) + if len(oldFeeds) == 0 { + return errors.New("An RSS feed must be specified.") } } - if len(urlSet) == 0 { - log.Print(s.Rooms) - return errors.New("An RSS feed must be specified.") - } return nil } -// PostRegister will start polling func (s *rssService) PostRegister(oldService types.Service) { - go polling.StartPolling(s, s.Poller()) + if len(feedUrls(s)) == 0 { // bye-bye :( + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + logger.Info("Deleting service (0 feeds)") + polling.StopPolling(s) + if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { + logger.WithError(err).Error("Failed to delete service") + } + } +} + +// feedUrls returns a list of feed urls for this service +func feedUrls(srv types.Service) []string { + var feeds []string + s, ok := srv.(*rssService) + if !ok { + return feeds + } + + urlSet := make(map[string]bool) + for _, roomInfo := range s.Rooms { + for u := range roomInfo.Feeds { + urlSet[u] = true + } + } + + for u := range urlSet { + feeds = append(feeds, u) + } + return feeds } func init() {