From fd54eb1a55eeaa6008566d0f2f0cfdc6455cc4f5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 11 Oct 2016 14:34:23 +0100 Subject: [PATCH] Move OnPoll into the Service method set This feels a lot better because now `OnPoll` works in a similar way to `OnReceiveWebhook` (called on a `Service`) rather than have this strange global-per-service-type struct. --- src/github.com/matrix-org/go-neb/api.go | 2 +- .../matrix-org/go-neb/polling/polling.go | 39 +++-- .../go-neb/services/rssbot/rssbot.go | 159 +++++++++--------- .../matrix-org/go-neb/types/types.go | 32 ++-- 4 files changed, 120 insertions(+), 112 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/api.go b/src/github.com/matrix-org/go-neb/api.go index efb3e10..4a7881d 100644 --- a/src/github.com/matrix-org/go-neb/api.go +++ b/src/github.com/matrix-org/go-neb/api.go @@ -309,7 +309,7 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac // Start any polling NOW because they may decide to stop it in PostRegister, and we want to make // sure we'll actually stop. - if service.Poller() != nil { + if _, ok := service.(types.Poller); ok { if err := polling.StartPolling(service); err != nil { log.WithFields(log.Fields{ "service_id": service.ServiceID(), diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index 26f9094..2500d30 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -1,7 +1,6 @@ package polling import ( - "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" @@ -27,10 +26,7 @@ func SetClients(clis *clients.Clients) { // Start polling already existing services func Start() error { // Work out which service types require polling - for serviceType, poller := range types.PollersByType() { - if poller == nil { - continue - } + for _, serviceType := range types.PollingServiceTypes() { // Query for all services with said service type srvs, err := database.GetServiceDB().LoadServicesByType(serviceType) if err != nil { @@ -50,15 +46,11 @@ func Start() error { // so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate // this poll. func StartPolling(service types.Service) error { - p := types.PollersByType()[service.ServiceType()] - if p == nil { - return fmt.Errorf("Service %s (type=%s) doesn't have a Poller", service.ServiceID(), service.ServiceType()) - } // Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here, // we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended. ts := time.Now().UnixNano() setPollStartTime(service, ts) - go pollLoop(service, p, ts) + go pollLoop(service, ts) return nil } @@ -73,13 +65,17 @@ func StopPolling(service types.Service) { // pollLoop begins the polling loop for this service. Does not return, so call this // as a goroutine! -func pollLoop(service types.Service, poller types.Poller, ts int64) { +func pollLoop(service types.Service, ts int64) { logger := log.WithFields(log.Fields{ - "timestamp": ts, - "service_id": service.ServiceID(), - "service_type": service.ServiceType(), - "interval_secs": poller.IntervalSecs(), + "timestamp": ts, + "service_id": service.ServiceID(), + "service_type": service.ServiceType(), }) + poller, ok := service.(types.Poller) + if !ok { + logger.Error("Service is not a Poller.") + return + } logger.Info("Starting polling loop") cli, err := clientPool.Client(service.ServiceUserID()) if err != nil { @@ -87,12 +83,21 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) { return } for { - poller.OnPoll(service, cli) + logger.Info("OnPoll") + nextTime := poller.OnPoll(cli) if pollTimeChanged(service, ts) { logger.Info("Terminating poll.") break } - time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second) + // work out how long to sleep + if nextTime.Unix() == 0 { + logger.Info("Terminating poll - OnPoll returned 0") + break + } + now := time.Now() + logger.Info("Sleeping for ", nextTime.Sub(now)) + time.Sleep(nextTime.Sub(now)) + if pollTimeChanged(service, ts) { logger.Info("Terminating poll.") break diff --git a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go index c0553da..a5b157c 100644 --- a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go +++ b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go @@ -15,25 +15,75 @@ import ( const minPollingIntervalSeconds = 60 // 1 min (News feeds can be genuinely spammy) -type feedPoller struct{} +type rssBotService struct { + types.DefaultService + id string + serviceUserID string + Feeds map[string]struct { // feed_url => { } + PollIntervalMins int `json:"poll_interval_mins"` + Rooms []string `json:"rooms"` + NextPollTimestampSecs int64 // Internal: When we should poll again + FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated + } `json:"feeds"` +} + +func (s *rssBotService) ServiceUserID() string { return s.serviceUserID } +func (s *rssBotService) ServiceID() string { return s.id } +func (s *rssBotService) ServiceType() string { return "rssbot" } + +// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. +func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error { + if len(s.Feeds) == 0 { + // this is an error UNLESS the old service had some feeds in which case they are deleting us :( + var numOldFeeds int + oldFeedService, ok := oldService.(*rssBotService) + if !ok { + log.WithField("service", oldService).Error("Old service isn't a rssBotService") + } else { + numOldFeeds = len(oldFeedService.Feeds) + } + if numOldFeeds == 0 { + return errors.New("An RSS feed must be specified.") + } + return nil + } + // Make sure we can parse the feed + for feedURL, feedInfo := range s.Feeds { + fp := gofeed.NewParser() + if _, err := fp.ParseURL(feedURL); err != nil { + return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) + } + if len(feedInfo.Rooms) == 0 { + return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL) + } + } + return nil +} -func (p *feedPoller) IntervalSecs() int64 { return 10 } -func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { +func (s *rssBotService) PostRegister(oldService types.Service) { + if len(s.Feeds) == 0 { // bye-bye :( + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + logger.Info("Deleting service: No feeds remaining.") + polling.StopPolling(s) + if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { + logger.WithError(err).Error("Failed to delete service") + } + } +} + +func (s *rssBotService) OnPoll(cli *matrix.Client) time.Time { logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), }) - - frService, ok := s.(*rssBotService) - if !ok { - logger.Error("RSSBot: OnPoll called without a Feed Service instance") - return - } now := time.Now().Unix() // Second resolution // Work out which feeds should be polled var pollFeeds []string - for u, feedInfo := range frService.Feeds { + for u, feedInfo := range s.Feeds { if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { // re-query this feed pollFeeds = append(pollFeeds, u) @@ -41,12 +91,12 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { } if len(pollFeeds) == 0 { - return + return s.nextTimestamp() } // Query each feed and send new items to subscribed rooms for _, u := range pollFeeds { - feed, items, err := p.queryFeed(frService, u) + feed, items, err := s.queryFeed(u) if err != nil { logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") continue @@ -54,7 +104,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { // Loop backwards since [0] is the most recent and we want to send in chronological order for i := len(items) - 1; i >= 0; i-- { item := items[i] - if err := p.sendToRooms(frService, cli, u, feed, item); err != nil { + if err := s.sendToRooms(cli, u, feed, item); err != nil { logger.WithFields(log.Fields{ "feed_url": u, log.ErrorKey: err, @@ -65,13 +115,26 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { } // Persist the service to save the next poll times - if _, err := database.GetServiceDB().StoreService(frService); err != nil { + if _, err := database.GetServiceDB().StoreService(s); err != nil { logger.WithError(err).Error("Failed to persist next poll times for service") } + + return s.nextTimestamp() +} + +func (s *rssBotService) nextTimestamp() time.Time { + // return the earliest next poll ts + var earliestNextTs int64 + for _, feedInfo := range s.Feeds { + if earliestNextTs == 0 || feedInfo.NextPollTimestampSecs < earliestNextTs { + earliestNextTs = feedInfo.NextPollTimestampSecs + } + } + return time.Unix(earliestNextTs, 0) } // Query the given feed, update relevant timestamps and return NEW items -func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { +func (s *rssBotService) queryFeed(feedURL string) (*gofeed.Feed, []gofeed.Item, error) { log.WithField("feed_url", feedURL).Info("Querying feed") var items []gofeed.Item fp := gofeed.NewParser() @@ -115,11 +178,11 @@ func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed, // TODO: Handle the 'sy' Syndication extension to control update interval. // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ - p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) + s.updateFeedInfo(feedURL, nextPollTsSec, feedLastUpdatedTs) return feed, items, nil } -func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs, feedUpdatedTs int64) { +func (s *rssBotService) updateFeedInfo(feedURL string, nextPollTs, feedUpdatedTs int64) { for u := range s.Feeds { if u != feedURL { continue @@ -131,7 +194,7 @@ func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs } } -func (p *feedPoller) sendToRooms(s *rssBotService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { +func (s *rssBotService) sendToRooms(cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) logger.Info("New feed item") for _, roomID := range s.Feeds[feedURL].Rooms { @@ -150,66 +213,6 @@ func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { )) } -type rssBotService struct { - types.DefaultService - id string - serviceUserID string - Feeds map[string]struct { // feed_url => { } - PollIntervalMins int `json:"poll_interval_mins"` - Rooms []string `json:"rooms"` - NextPollTimestampSecs int64 // Internal: When we should poll again - FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated - } `json:"feeds"` -} - -func (s *rssBotService) ServiceUserID() string { return s.serviceUserID } -func (s *rssBotService) ServiceID() string { return s.id } -func (s *rssBotService) ServiceType() string { return "rssbot" } -func (s *rssBotService) Poller() types.Poller { return &feedPoller{} } - -// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. -func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error { - if len(s.Feeds) == 0 { - // this is an error UNLESS the old service had some feeds in which case they are deleting us :( - var numOldFeeds int - oldFeedService, ok := oldService.(*rssBotService) - if !ok { - log.WithField("service", oldService).Error("Old service isn't a rssBotService") - } else { - numOldFeeds = len(oldFeedService.Feeds) - } - if numOldFeeds == 0 { - return errors.New("An RSS feed must be specified.") - } - return nil - } - // Make sure we can parse the feed - for feedURL, feedInfo := range s.Feeds { - fp := gofeed.NewParser() - if _, err := fp.ParseURL(feedURL); err != nil { - return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) - } - if len(feedInfo.Rooms) == 0 { - return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL) - } - } - return nil -} - -func (s *rssBotService) PostRegister(oldService types.Service) { - if len(s.Feeds) == 0 { // bye-bye :( - logger := log.WithFields(log.Fields{ - "service_id": s.ServiceID(), - "service_type": s.ServiceType(), - }) - logger.Info("Deleting service: No feeds remaining.") - polling.StopPolling(s) - if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { - logger.WithError(err).Error("Failed to delete service") - } - } -} - func init() { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { r := &rssBotService{ diff --git a/src/github.com/matrix-org/go-neb/types/types.go b/src/github.com/matrix-org/go-neb/types/types.go index 1f0c0d4..cc4afe6 100644 --- a/src/github.com/matrix-org/go-neb/types/types.go +++ b/src/github.com/matrix-org/go-neb/types/types.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "strings" + "time" ) // A ClientConfig is the configuration for a matrix client for a bot to use. @@ -40,10 +41,11 @@ type BotOptions struct { Options map[string]interface{} } -// Poller represents a thing that can be polled at a given rate. +// Poller represents a thing which can poll. Services should implement this method signature to support polling. type Poller interface { - IntervalSecs() int64 - OnPoll(service Service, client *matrix.Client) + // OnPoll is called when the poller should poll. Return the timestamp when you want to be polled again. + // Return 0 to never be polled again. + OnPoll(client *matrix.Client) time.Time } // A Service is the configuration for a bot service. @@ -66,11 +68,6 @@ type Service interface { // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean // up resources which are no longer needed (e.g. removing old webhooks). PostRegister(oldService Service) - // Return a Poller object if you wish to be invoked every N seconds. This struct MUST NOT conditionally change: either - // ALWAYS return a new Poller interface or NEVER return a Poller. The Poller will exist outside of the lifetime of the - // Service upon which it is being called on, so DO NOT wrap this inside a closure or else you will introduce a memory - // leak. An instantiated service will be passed into the `OnPoll(Service)` for you to extract state. - Poller() Poller } // DefaultService NO-OPs the implementation of optional Service interface methods. Feel free to override them. @@ -89,9 +86,6 @@ func (s *DefaultService) Register(oldService Service, client *matrix.Client) err // PostRegister does nothing. func (s *DefaultService) PostRegister(oldService Service) {} -// Poller returns no poller. -func (s *DefaultService) Poller() Poller { return nil } - // OnReceiveWebhook does nothing but 200 OK the request. func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { w.WriteHeader(200) // Do nothing @@ -116,18 +110,24 @@ func BaseURL(u string) error { } var servicesByType = map[string]func(string, string, string) Service{} -var pollersByType = map[string]Poller{} +var serviceTypesWhichPoll = map[string]bool{} // RegisterService registers a factory for creating Service instances. func RegisterService(factory func(string, string, string) Service) { s := factory("", "", "") servicesByType[s.ServiceType()] = factory - pollersByType[s.ServiceType()] = s.Poller() + + if _, ok := s.(Poller); ok { + serviceTypesWhichPoll[s.ServiceType()] = true + } } -// PollersByType returns a map of service type to poller, which may be nil -func PollersByType() map[string]Poller { - return pollersByType +// PollingServiceTypes returns a list of service types which meet the Poller interface +func PollingServiceTypes() (types []string) { + for t := range serviceTypesWhichPoll { + types = append(types, t) + } + return } // CreateService creates a Service of the given type and serviceID.