From 3e3f71ad92320e9ae43529133cf9660b57a8e46d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 30 Sep 2016 16:01:29 +0100 Subject: [PATCH 1/7] s/rss/feedreader/ since it does atom feeds too. Add TODO --- src/github.com/matrix-org/go-neb/goneb.go | 1 + .../{rss/rss.go => feedreader/feedreader.go} | 49 +++++++++++++------ 2 files changed, 35 insertions(+), 15 deletions(-) rename src/github.com/matrix-org/go-neb/services/{rss/rss.go => feedreader/feedreader.go} (62%) diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index a715aae..a2c1767 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -10,6 +10,7 @@ import ( _ "github.com/matrix-org/go-neb/realms/jira" "github.com/matrix-org/go-neb/server" _ "github.com/matrix-org/go-neb/services/echo" + _ "github.com/matrix-org/go-neb/services/feedreader" _ "github.com/matrix-org/go-neb/services/giphy" _ "github.com/matrix-org/go-neb/services/github" _ "github.com/matrix-org/go-neb/services/guggy" diff --git a/src/github.com/matrix-org/go-neb/services/rss/rss.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go similarity index 62% rename from src/github.com/matrix-org/go-neb/services/rss/rss.go rename to src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 25e36fa..1d89ef5 100644 --- a/src/github.com/matrix-org/go-neb/services/rss/rss.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -7,14 +7,15 @@ import ( "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" + "github.com/mmcdole/gofeed" "time" ) -type rssPoller struct{} +type feedPoller struct{} -func (p *rssPoller) IntervalSecs() int64 { return 10 } -func (p *rssPoller) OnPoll(s types.Service) { - rsss, ok := s.(*rssService) +func (p *feedPoller) IntervalSecs() int64 { return 10 } +func (p *feedPoller) OnPoll(s types.Service) { + frService, ok := s.(*feedReaderService) if !ok { log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service") return @@ -23,7 +24,7 @@ func (p *rssPoller) OnPoll(s types.Service) { // URL => [ RoomID ] urlsToRooms := make(map[string][]string) - for roomID, roomInfo := range rsss.Rooms { + for roomID, roomInfo := range frService.Rooms { for u, feedInfo := range roomInfo.Feeds { if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now { // re-query this feed @@ -32,10 +33,28 @@ func (p *rssPoller) OnPoll(s types.Service) { } } - // TODO: Some polling + // TODO: Keep a "next poll ts" value (default 0) + // If ts is 0 or now > ts, then poll and work out next poll ts. + // Worked out by looking at the chosen interval period (prioritise the feed retry time where it exists) + // Persist the next poll ts to the database. + + for u, _ := range urlsToRooms { + fp := gofeed.NewParser() + feed, err := fp.ParseURL(u) + if err != nil { + log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "url": u, + log.ErrorKey: err, + }).Error("Failed to parse feed") + continue + } + log.Print(feed) + } + } -type rssService struct { +type feedReaderService struct { types.DefaultService id string serviceUserID string @@ -47,13 +66,13 @@ type rssService struct { } } -func (s *rssService) ServiceUserID() string { return s.serviceUserID } -func (s *rssService) ServiceID() string { return s.id } -func (s *rssService) ServiceType() string { return "rss" } -func (s *rssService) Poller() types.Poller { return &rssPoller{} } +func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID } +func (s *feedReaderService) ServiceID() string { return s.id } +func (s *feedReaderService) ServiceType() string { return "feedreader" } +func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. -func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { +func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error { feeds := feedUrls(s) if len(feeds) == 0 { // this is an error UNLESS the old service had some feeds in which case they are deleting us :( @@ -65,7 +84,7 @@ func (s *rssService) Register(oldService types.Service, client *matrix.Client) e return nil } -func (s *rssService) PostRegister(oldService types.Service) { +func (s *feedReaderService) PostRegister(oldService types.Service) { if len(feedUrls(s)) == 0 { // bye-bye :( logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), @@ -82,7 +101,7 @@ func (s *rssService) PostRegister(oldService types.Service) { // feedUrls returns a list of feed urls for this service func feedUrls(srv types.Service) []string { var feeds []string - s, ok := srv.(*rssService) + s, ok := srv.(*feedReaderService) if !ok { return feeds } @@ -102,7 +121,7 @@ func feedUrls(srv types.Service) []string { func init() { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { - r := &rssService{ + r := &feedReaderService{ id: serviceID, serviceUserID: serviceUserID, } From a5e515e118a8285afe06a02d1dae7bb6d93a59b7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 30 Sep 2016 16:05:20 +0100 Subject: [PATCH 2/7] Lint --- .../matrix-org/go-neb/services/feedreader/feedreader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 1d89ef5..71c72d4 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -38,7 +38,7 @@ func (p *feedPoller) OnPoll(s types.Service) { // Worked out by looking at the chosen interval period (prioritise the feed retry time where it exists) // Persist the next poll ts to the database. - for u, _ := range urlsToRooms { + for u := range urlsToRooms { fp := gofeed.NewParser() feed, err := fp.ParseURL(u) if err != nil { From 2d6b3cd6e918bb0b98b6986a59d14134443579ed Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 4 Oct 2016 17:13:13 +0100 Subject: [PATCH 3/7] Flesh out the feedreader service Just need to send messages into rooms now for a first cut to be done. Notable improvements to make: - We currently do 1 goroutine per service. This could be bad if we have lots of these things running around. - We do not cache the response to RSS feeds. If we have 10 independent services on the same feed URL, we will hit the URL 10 times. This is similar to how we currently do 1 webhook/service, so it's plausible that in the future we will want to have some kind of generic caching layer. - We don't send messages to Matrix yet. We need a `Clients` instance but can't get at one. There's only ever one, so I wonder if we should global it like we do with `GetServiceDB()` for ease of use? - The polling interval is divorced from the actual feed repoll time. Ideally we would schedule the goroutine only when we need it, rather than checking frequently, determining we have nothing to do, and going back to sleep. --- src/github.com/matrix-org/go-neb/goneb.go | 8 +- .../go-neb/services/feedreader/feedreader.go | 182 ++++++++++++------ 2 files changed, 131 insertions(+), 59 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index a2c1767..92c43f7 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -45,18 +45,18 @@ func main() { err := types.BaseURL(baseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to get base url") } db, err := database.Open(databaseType, databaseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to open database") } database.SetServiceDB(db) clients := clients.New(db) if err := clients.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start up clients") } http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{})) @@ -73,7 +73,7 @@ func main() { http.HandleFunc("/realms/redirects/", rh.handle) if err := polling.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start polling") } http.ListenAndServe(bindAddress, nil) diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 71c72d4..c370e7a 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -11,59 +11,147 @@ import ( "time" ) +const minPollingIntervalSeconds = (10 * 60) // 10min + type feedPoller struct{} func (p *feedPoller) IntervalSecs() int64 { return 10 } func (p *feedPoller) OnPoll(s types.Service) { + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + frService, ok := s.(*feedReaderService) if !ok { - log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service") + logger.Error("FeedReader: OnPoll called without an Feed Service instance") return } now := time.Now().Unix() // Second resolution - // URL => [ RoomID ] - urlsToRooms := make(map[string][]string) - - for roomID, roomInfo := range frService.Rooms { - for u, feedInfo := range roomInfo.Feeds { - if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now { - // re-query this feed - urlsToRooms[u] = append(urlsToRooms[u], roomID) + + // Work out which feeds should be polled + var pollFeeds []string + for u, feedInfo := range frService.Feeds { + if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { + // re-query this feed + pollFeeds = append(pollFeeds, u) + } + } + + // Query each feed and send new items to subscribed rooms + for _, u := range pollFeeds { + items, err := p.queryFeed(frService, u) + if err != nil { + logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") + continue + } + for _, i := range items { + if err := p.sendToRooms(frService, u, i); err != nil { + logger.WithFields(log.Fields{ + "feed_url": u, + log.ErrorKey: err, + "item": i, + }).Error("Failed to send item to room") } } } - // TODO: Keep a "next poll ts" value (default 0) - // If ts is 0 or now > ts, then poll and work out next poll ts. - // Worked out by looking at the chosen interval period (prioritise the feed retry time where it exists) - // Persist the next poll ts to the database. + // Persist the service to save the next poll times if we did some queries + if len(pollFeeds) == 0 { + return + } + if _, err := database.GetServiceDB().StoreService(frService); err != nil { + logger.WithError(err).Error("Failed to persist next poll times for service") + } +} - for u := range urlsToRooms { - fp := gofeed.NewParser() - feed, err := fp.ParseURL(u) - if err != nil { - log.WithFields(log.Fields{ - "service_id": s.ServiceID(), - "url": u, - log.ErrorKey: err, - }).Error("Failed to parse feed") +// Query the given feed, update relevant timestamps and return NEW items +func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) { + var items []gofeed.Item + fp := gofeed.NewParser() + feed, err := fp.ParseURL(feedURL) + if err != nil { + return items, err + } + + // Work out which items are new, if any (based on the last updated TS we have) + // If the TS is 0 then this is the first ever poll, so let's not send 10s of events + // into the room and just do new ones from this point onwards. + // if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { + for _, i := range feed.Items { + if i == nil || i.PublishedParsed == nil { continue } - log.Print(feed) + if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { + items = append(items, *i) + } + } + // } + if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 { + log.Debug("ts is 0") + } + + now := time.Now().Unix() // Second resolution + + // Work out when this feed was last updated + var feedLastUpdatedTs int64 + if feed.UpdatedParsed != nil { + feedLastUpdatedTs = feed.UpdatedParsed.Unix() + } else if len(feed.Items) > 0 { + i := feed.Items[0] + if i != nil && i.PublishedParsed != nil { + feedLastUpdatedTs = i.PublishedParsed.Unix() + } } + // Work out when to next poll this feed + nextPollTsSec := now + minPollingIntervalSeconds + if s.Feeds[feedURL].PollIntervalMins > 10 { + nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60) + } + // TODO: Handle the 'sy' Syndication extension to control update interval. + // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ + + p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) + return items, nil +} + +func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) { + for u := range s.Feeds { + if u != feedURL { + continue + } + f := s.Feeds[u] + f.NextPollTimestampSecs = nextPollTs + f.FeedUpdatedTimestampSecs = feedUpdatedTs + s.Feeds[u] = f + } +} + +func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error { + log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item") + var rooms []string + for roomID, urls := range s.Rooms { + for _, u := range urls { + if u == feedURL { + rooms = append(rooms, roomID) + break + } + } + } + return nil } type feedReaderService struct { types.DefaultService id string serviceUserID string - Rooms map[string]struct { // room_id => {} - Feeds map[string]struct { // URL => { } - PollIntervalMins int `json:"poll_interval_mins"` - LastPollTimestampSecs int64 - } - } + Feeds map[string]struct { // feed_url => { } + PollIntervalMins int `json:"poll_interval_mins"` + NextPollTimestampSecs int64 // Internal: When we should poll again + FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated + } `json:"feeds"` + Rooms map[string][]string `json:"rooms"` // room_id => [ feed_url ] } func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID } @@ -73,11 +161,16 @@ func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error { - feeds := feedUrls(s) - if len(feeds) == 0 { + if len(s.Feeds) == 0 { // this is an error UNLESS the old service had some feeds in which case they are deleting us :( - oldFeeds := feedUrls(oldService) - if len(oldFeeds) == 0 { + var numOldFeeds int + oldFeedService, ok := oldService.(*feedReaderService) + if !ok { + log.WithField("service_id", oldService.ServiceID()).Error("Old service isn't a FeedReaderService") + } else { + numOldFeeds = len(oldFeedService.Feeds) + } + if numOldFeeds == 0 { return errors.New("An RSS feed must be specified.") } } @@ -85,7 +178,7 @@ func (s *feedReaderService) Register(oldService types.Service, client *matrix.Cl } func (s *feedReaderService) PostRegister(oldService types.Service) { - if len(feedUrls(s)) == 0 { // bye-bye :( + if len(s.Feeds) == 0 { // bye-bye :( logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), @@ -98,27 +191,6 @@ func (s *feedReaderService) PostRegister(oldService types.Service) { } } -// feedUrls returns a list of feed urls for this service -func feedUrls(srv types.Service) []string { - var feeds []string - s, ok := srv.(*feedReaderService) - if !ok { - return feeds - } - - urlSet := make(map[string]bool) - for _, roomInfo := range s.Rooms { - for u := range roomInfo.Feeds { - urlSet[u] = true - } - } - - for u := range urlSet { - feeds = append(feeds, u) - } - return feeds -} - func init() { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { r := &feedReaderService{ From 0b09fcd86d9b3aeff0e2f882ec9e30f7aa9e6da2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Oct 2016 11:20:32 +0100 Subject: [PATCH 4/7] First working feed reader version --- src/github.com/matrix-org/go-neb/goneb.go | 1 + .../matrix-org/go-neb/polling/polling.go | 13 ++++- .../go-neb/services/feedreader/feedreader.go | 51 ++++++++++++------- .../matrix-org/go-neb/types/types.go | 2 +- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index 92c43f7..8fe0633 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -72,6 +72,7 @@ func main() { rh := &realmRedirectHandler{db: db} http.HandleFunc("/realms/redirects/", rh.handle) + polling.SetClients(clients) if err := polling.Start(); err != nil { log.WithError(err).Panic("Failed to start polling") } diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index 7f0c49e..f679b80 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -3,6 +3,7 @@ package polling import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/types" "sync" @@ -16,6 +17,11 @@ var ( pollMutex sync.Mutex startPollTime = make(map[string]int64) // ServiceID => unix timestamp ) +var clientPool *clients.Clients + +func SetClients(clis *clients.Clients) { + clientPool = clis +} // Start polling already existing services func Start() error { @@ -74,8 +80,13 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) { "interval_secs": poller.IntervalSecs(), }) logger.Info("Starting polling loop") + cli, err := clientPool.Client(service.ServiceUserID()) + if err != nil { + logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client") + return + } for { - poller.OnPoll(service) + poller.OnPoll(service, cli) if pollTimeChanged(service, ts) { logger.Info("Terminating poll.") break diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index c370e7a..3a4e03a 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -2,12 +2,14 @@ package services import ( "errors" + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" "github.com/mmcdole/gofeed" + "html" "time" ) @@ -16,7 +18,7 @@ const minPollingIntervalSeconds = (10 * 60) // 10min type feedPoller struct{} func (p *feedPoller) IntervalSecs() int64 { return 10 } -func (p *feedPoller) OnPoll(s types.Service) { +func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), @@ -40,13 +42,13 @@ func (p *feedPoller) OnPoll(s types.Service) { // Query each feed and send new items to subscribed rooms for _, u := range pollFeeds { - items, err := p.queryFeed(frService, u) + feed, items, err := p.queryFeed(frService, u) if err != nil { logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") continue } for _, i := range items { - if err := p.sendToRooms(frService, u, i); err != nil { + if err := p.sendToRooms(frService, cli, u, feed, i); err != nil { logger.WithFields(log.Fields{ "feed_url": u, log.ErrorKey: err, @@ -66,30 +68,27 @@ func (p *feedPoller) OnPoll(s types.Service) { } // Query the given feed, update relevant timestamps and return NEW items -func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) { +func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { var items []gofeed.Item fp := gofeed.NewParser() feed, err := fp.ParseURL(feedURL) if err != nil { - return items, err + return nil, items, err } // Work out which items are new, if any (based on the last updated TS we have) // If the TS is 0 then this is the first ever poll, so let's not send 10s of events // into the room and just do new ones from this point onwards. - // if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { - for _, i := range feed.Items { - if i == nil || i.PublishedParsed == nil { - continue - } - if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { - items = append(items, *i) + if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { + for _, i := range feed.Items { + if i == nil || i.PublishedParsed == nil { + continue + } + if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { + items = append(items, *i) + } } } - // } - if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 { - log.Debug("ts is 0") - } now := time.Now().Unix() // Second resolution @@ -113,7 +112,7 @@ func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.I // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) - return items, nil + return feed, items, nil } func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) { @@ -128,8 +127,9 @@ func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPo } } -func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error { - log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item") +func (p *feedPoller) sendToRooms(s *feedReaderService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { + logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) + logger.Info("New feed item") var rooms []string for roomID, urls := range s.Rooms { for _, u := range urls { @@ -139,9 +139,22 @@ func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofe } } } + for _, roomID := range rooms { + if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil { + logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room") + } + } return nil } +// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag ) +func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { + return matrix.GetHTMLMessage("m.notice", fmt.Sprintf( + "%s posted a new article: %s ( %s )", + html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link), + )) +} + type feedReaderService struct { types.DefaultService id string diff --git a/src/github.com/matrix-org/go-neb/types/types.go b/src/github.com/matrix-org/go-neb/types/types.go index a2cfea3..1f0c0d4 100644 --- a/src/github.com/matrix-org/go-neb/types/types.go +++ b/src/github.com/matrix-org/go-neb/types/types.go @@ -43,7 +43,7 @@ type BotOptions struct { // Poller represents a thing that can be polled at a given rate. type Poller interface { IntervalSecs() int64 - OnPoll(service Service) + OnPoll(service Service, client *matrix.Client) } // A Service is the configuration for a bot service. From d92a95ff4e034ccc24d6fe0e0bde2d84187d7ce4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Oct 2016 11:20:59 +0100 Subject: [PATCH 5/7] Linting --- src/github.com/matrix-org/go-neb/polling/polling.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index f679b80..26f9094 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -19,6 +19,7 @@ var ( ) var clientPool *clients.Clients +// SetClients sets a pool of clients for passing into OnPoll func SetClients(clis *clients.Clients) { clientPool = clis } From 98e7533469f43547b895d864734da909cea9db25 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Oct 2016 11:45:34 +0100 Subject: [PATCH 6/7] Send feed items in reverse-order for chronological times Also add checks on Register() to make sure the feeds are valid --- .../go-neb/services/feedreader/feedreader.go | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 3a4e03a..8612c31 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -47,12 +47,14 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") continue } - for _, i := range items { - if err := p.sendToRooms(frService, cli, u, feed, i); err != nil { + // Loop backwards since [0] is the most recent and we want to send in chronological order + for i := len(items) - 1; i >= 0; i-- { + item := items[i] + if err := p.sendToRooms(frService, cli, u, feed, item); err != nil { logger.WithFields(log.Fields{ "feed_url": u, log.ErrorKey: err, - "item": i, + "item": item, }).Error("Failed to send item to room") } } @@ -69,6 +71,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { // Query the given feed, update relevant timestamps and return NEW items func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { + log.WithField("feed_url", feedURL).Info("Querying feed") var items []gofeed.Item fp := gofeed.NewParser() feed, err := fp.ParseURL(feedURL) @@ -105,7 +108,7 @@ func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Fe // Work out when to next poll this feed nextPollTsSec := now + minPollingIntervalSeconds - if s.Feeds[feedURL].PollIntervalMins > 10 { + if s.Feeds[feedURL].PollIntervalMins > int(minPollingIntervalSeconds/60) { nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60) } // TODO: Handle the 'sy' Syndication extension to control update interval. @@ -186,6 +189,23 @@ func (s *feedReaderService) Register(oldService types.Service, client *matrix.Cl if numOldFeeds == 0 { return errors.New("An RSS feed must be specified.") } + return nil + } + // Make sure we can parse the feed + for feedURL := range s.Feeds { + fp := gofeed.NewParser() + if _, err := fp.ParseURL(feedURL); err != nil { + return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) + } + } + // Make sure all feeds are accounted for (appear at least once) in the room map, AND make sure there + // are no weird new feeds in those rooms + for roomID, roomFeeds := range s.Rooms { + for _, f := range roomFeeds { + if _, exists := s.Feeds[f]; !exists { + return fmt.Errorf("Feed URL %s in room %s does not exist in the Feeds section", f, roomID) + } + } } return nil } From 7b4448957b17308507132c27660f11eac5f4f883 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Oct 2016 14:06:23 +0100 Subject: [PATCH 7/7] Review comments --- .../go-neb/services/feedreader/feedreader.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index 8612c31..0819ce2 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -26,7 +26,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { frService, ok := s.(*feedReaderService) if !ok { - logger.Error("FeedReader: OnPoll called without an Feed Service instance") + logger.Error("FeedReader: OnPoll called without a Feed Service instance") return } now := time.Now().Unix() // Second resolution @@ -40,6 +40,10 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { } } + if len(pollFeeds) == 0 { + return + } + // Query each feed and send new items to subscribed rooms for _, u := range pollFeeds { feed, items, err := p.queryFeed(frService, u) @@ -60,10 +64,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { } } - // Persist the service to save the next poll times if we did some queries - if len(pollFeeds) == 0 { - return - } + // Persist the service to save the next poll times if _, err := database.GetServiceDB().StoreService(frService); err != nil { logger.WithError(err).Error("Failed to persist next poll times for service") } @@ -216,7 +217,7 @@ func (s *feedReaderService) PostRegister(oldService types.Service) { "service_id": s.ServiceID(), "service_type": s.ServiceType(), }) - logger.Info("Deleting service (0 feeds)") + logger.Info("Deleting service: No feeds remaining.") polling.StopPolling(s) if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { logger.WithError(err).Error("Failed to delete service")