From 2d6b3cd6e918bb0b98b6986a59d14134443579ed Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 4 Oct 2016 17:13:13 +0100 Subject: [PATCH] Flesh out the feedreader service Just need to send messages into rooms now for a first cut to be done. Notable improvements to make: - We currently do 1 goroutine per service. This could be bad if we have lots of these things running around. - We do not cache the response to RSS feeds. If we have 10 independent services on the same feed URL, we will hit the URL 10 times. This is similar to how we currently do 1 webhook/service, so it's plausible that in the future we will want to have some kind of generic caching layer. - We don't send messages to Matrix yet. We need a `Clients` instance but can't get at one. There's only ever one, so I wonder if we should global it like we do with `GetServiceDB()` for ease of use? - The polling interval is divorced from the actual feed repoll time. Ideally we would schedule the goroutine only when we need it, rather than checking frequently, determining we have nothing to do, and going back to sleep. --- src/github.com/matrix-org/go-neb/goneb.go | 8 +- .../go-neb/services/feedreader/feedreader.go | 182 ++++++++++++------ 2 files changed, 131 insertions(+), 59 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index a2c1767..92c43f7 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -45,18 +45,18 @@ func main() { err := types.BaseURL(baseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to get base url") } db, err := database.Open(databaseType, databaseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to open database") } database.SetServiceDB(db) clients := clients.New(db) if err := clients.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start up clients") } http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{})) @@ -73,7 +73,7 @@ func main() { http.HandleFunc("/realms/redirects/", rh.handle) if err := polling.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start polling") } http.ListenAndServe(bindAddress, nil) diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 71c72d4..c370e7a 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -11,59 +11,147 @@ import ( "time" ) +const minPollingIntervalSeconds = (10 * 60) // 10min + type feedPoller struct{} func (p *feedPoller) IntervalSecs() int64 { return 10 } func (p *feedPoller) OnPoll(s types.Service) { + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + frService, ok := s.(*feedReaderService) if !ok { - log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service") + logger.Error("FeedReader: OnPoll called without an Feed Service instance") return } now := time.Now().Unix() // Second resolution - // URL => [ RoomID ] - urlsToRooms := make(map[string][]string) - - for roomID, roomInfo := range frService.Rooms { - for u, feedInfo := range roomInfo.Feeds { - if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now { - // re-query this feed - urlsToRooms[u] = append(urlsToRooms[u], roomID) + + // Work out which feeds should be polled + var pollFeeds []string + for u, feedInfo := range frService.Feeds { + if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { + // re-query this feed + pollFeeds = append(pollFeeds, u) + } + } + + // Query each feed and send new items to subscribed rooms + for _, u := range pollFeeds { + items, err := p.queryFeed(frService, u) + if err != nil { + logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") + continue + } + for _, i := range items { + if err := p.sendToRooms(frService, u, i); err != nil { + logger.WithFields(log.Fields{ + "feed_url": u, + log.ErrorKey: err, + "item": i, + }).Error("Failed to send item to room") } } } - // TODO: Keep a "next poll ts" value (default 0) - // If ts is 0 or now > ts, then poll and work out next poll ts. - // Worked out by looking at the chosen interval period (prioritise the feed retry time where it exists) - // Persist the next poll ts to the database. + // Persist the service to save the next poll times if we did some queries + if len(pollFeeds) == 0 { + return + } + if _, err := database.GetServiceDB().StoreService(frService); err != nil { + logger.WithError(err).Error("Failed to persist next poll times for service") + } +} - for u := range urlsToRooms { - fp := gofeed.NewParser() - feed, err := fp.ParseURL(u) - if err != nil { - log.WithFields(log.Fields{ - "service_id": s.ServiceID(), - "url": u, - log.ErrorKey: err, - }).Error("Failed to parse feed") +// Query the given feed, update relevant timestamps and return NEW items +func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) { + var items []gofeed.Item + fp := gofeed.NewParser() + feed, err := fp.ParseURL(feedURL) + if err != nil { + return items, err + } + + // Work out which items are new, if any (based on the last updated TS we have) + // If the TS is 0 then this is the first ever poll, so let's not send 10s of events + // into the room and just do new ones from this point onwards. + // if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { + for _, i := range feed.Items { + if i == nil || i.PublishedParsed == nil { continue } - log.Print(feed) + if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { + items = append(items, *i) + } + } + // } + if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 { + log.Debug("ts is 0") + } + + now := time.Now().Unix() // Second resolution + + // Work out when this feed was last updated + var feedLastUpdatedTs int64 + if feed.UpdatedParsed != nil { + feedLastUpdatedTs = feed.UpdatedParsed.Unix() + } else if len(feed.Items) > 0 { + i := feed.Items[0] + if i != nil && i.PublishedParsed != nil { + feedLastUpdatedTs = i.PublishedParsed.Unix() + } } + // Work out when to next poll this feed + nextPollTsSec := now + minPollingIntervalSeconds + if s.Feeds[feedURL].PollIntervalMins > 10 { + nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60) + } + // TODO: Handle the 'sy' Syndication extension to control update interval. + // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ + + p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) + return items, nil +} + +func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) { + for u := range s.Feeds { + if u != feedURL { + continue + } + f := s.Feeds[u] + f.NextPollTimestampSecs = nextPollTs + f.FeedUpdatedTimestampSecs = feedUpdatedTs + s.Feeds[u] = f + } +} + +func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error { + log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item") + var rooms []string + for roomID, urls := range s.Rooms { + for _, u := range urls { + if u == feedURL { + rooms = append(rooms, roomID) + break + } + } + } + return nil } type feedReaderService struct { types.DefaultService id string serviceUserID string - Rooms map[string]struct { // room_id => {} - Feeds map[string]struct { // URL => { } - PollIntervalMins int `json:"poll_interval_mins"` - LastPollTimestampSecs int64 - } - } + Feeds map[string]struct { // feed_url => { } + PollIntervalMins int `json:"poll_interval_mins"` + NextPollTimestampSecs int64 // Internal: When we should poll again + FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated + } `json:"feeds"` + Rooms map[string][]string `json:"rooms"` // room_id => [ feed_url ] } func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID } @@ -73,11 +161,16 @@ func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error { - feeds := feedUrls(s) - if len(feeds) == 0 { + if len(s.Feeds) == 0 { // this is an error UNLESS the old service had some feeds in which case they are deleting us :( - oldFeeds := feedUrls(oldService) - if len(oldFeeds) == 0 { + var numOldFeeds int + oldFeedService, ok := oldService.(*feedReaderService) + if !ok { + log.WithField("service_id", oldService.ServiceID()).Error("Old service isn't a FeedReaderService") + } else { + numOldFeeds = len(oldFeedService.Feeds) + } + if numOldFeeds == 0 { return errors.New("An RSS feed must be specified.") } } @@ -85,7 +178,7 @@ func (s *feedReaderService) Register(oldService types.Service, client *matrix.Cl } func (s *feedReaderService) PostRegister(oldService types.Service) { - if len(feedUrls(s)) == 0 { // bye-bye :( + if len(s.Feeds) == 0 { // bye-bye :( logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), @@ -98,27 +191,6 @@ func (s *feedReaderService) PostRegister(oldService types.Service) { } } -// feedUrls returns a list of feed urls for this service -func feedUrls(srv types.Service) []string { - var feeds []string - s, ok := srv.(*feedReaderService) - if !ok { - return feeds - } - - urlSet := make(map[string]bool) - for _, roomInfo := range s.Rooms { - for u := range roomInfo.Feeds { - urlSet[u] = true - } - } - - for u := range urlSet { - feeds = append(feeds, u) - } - return feeds -} - func init() { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { r := &feedReaderService{