From 0b09fcd86d9b3aeff0e2f882ec9e30f7aa9e6da2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Oct 2016 11:20:32 +0100 Subject: [PATCH] First working feed reader version --- src/github.com/matrix-org/go-neb/goneb.go | 1 + .../matrix-org/go-neb/polling/polling.go | 13 ++++- .../go-neb/services/feedreader/feedreader.go | 51 ++++++++++++------- .../matrix-org/go-neb/types/types.go | 2 +- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index 92c43f7..8fe0633 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -72,6 +72,7 @@ func main() { rh := &realmRedirectHandler{db: db} http.HandleFunc("/realms/redirects/", rh.handle) + polling.SetClients(clients) if err := polling.Start(); err != nil { log.WithError(err).Panic("Failed to start polling") } diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index 7f0c49e..f679b80 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -3,6 +3,7 @@ package polling import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/types" "sync" @@ -16,6 +17,11 @@ var ( pollMutex sync.Mutex startPollTime = make(map[string]int64) // ServiceID => unix timestamp ) +var clientPool *clients.Clients + +func SetClients(clis *clients.Clients) { + clientPool = clis +} // Start polling already existing services func Start() error { @@ -74,8 +80,13 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) { "interval_secs": poller.IntervalSecs(), }) logger.Info("Starting polling loop") + cli, err := clientPool.Client(service.ServiceUserID()) + if err != nil { + logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client") + return + } for { - poller.OnPoll(service) + poller.OnPoll(service, cli) if pollTimeChanged(service, ts) { logger.Info("Terminating poll.") break diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go index c370e7a..3a4e03a 100644 --- a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -2,12 +2,14 @@ package services import ( "errors" + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" "github.com/mmcdole/gofeed" + "html" "time" ) @@ -16,7 +18,7 @@ const minPollingIntervalSeconds = (10 * 60) // 10min type feedPoller struct{} func (p *feedPoller) IntervalSecs() int64 { return 10 } -func (p *feedPoller) OnPoll(s types.Service) { +func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), @@ -40,13 +42,13 @@ func (p *feedPoller) OnPoll(s types.Service) { // Query each feed and send new items to subscribed rooms for _, u := range pollFeeds { - items, err := p.queryFeed(frService, u) + feed, items, err := p.queryFeed(frService, u) if err != nil { logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") continue } for _, i := range items { - if err := p.sendToRooms(frService, u, i); err != nil { + if err := p.sendToRooms(frService, cli, u, feed, i); err != nil { logger.WithFields(log.Fields{ "feed_url": u, log.ErrorKey: err, @@ -66,30 +68,27 @@ func (p *feedPoller) OnPoll(s types.Service) { } // Query the given feed, update relevant timestamps and return NEW items -func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) { +func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { var items []gofeed.Item fp := gofeed.NewParser() feed, err := fp.ParseURL(feedURL) if err != nil { - return items, err + return nil, items, err } // Work out which items are new, if any (based on the last updated TS we have) // If the TS is 0 then this is the first ever poll, so let's not send 10s of events // into the room and just do new ones from this point onwards. - // if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { - for _, i := range feed.Items { - if i == nil || i.PublishedParsed == nil { - continue - } - if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { - items = append(items, *i) + if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { + for _, i := range feed.Items { + if i == nil || i.PublishedParsed == nil { + continue + } + if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { + items = append(items, *i) + } } } - // } - if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 { - log.Debug("ts is 0") - } now := time.Now().Unix() // Second resolution @@ -113,7 +112,7 @@ func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.I // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) - return items, nil + return feed, items, nil } func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) { @@ -128,8 +127,9 @@ func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPo } } -func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error { - log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item") +func (p *feedPoller) sendToRooms(s *feedReaderService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { + logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) + logger.Info("New feed item") var rooms []string for roomID, urls := range s.Rooms { for _, u := range urls { @@ -139,9 +139,22 @@ func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofe } } } + for _, roomID := range rooms { + if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil { + logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room") + } + } return nil } +// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag ) +func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { + return matrix.GetHTMLMessage("m.notice", fmt.Sprintf( + "%s posted a new article: %s ( %s )", + html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link), + )) +} + type feedReaderService struct { types.DefaultService id string diff --git a/src/github.com/matrix-org/go-neb/types/types.go b/src/github.com/matrix-org/go-neb/types/types.go index a2cfea3..1f0c0d4 100644 --- a/src/github.com/matrix-org/go-neb/types/types.go +++ b/src/github.com/matrix-org/go-neb/types/types.go @@ -43,7 +43,7 @@ type BotOptions struct { // Poller represents a thing that can be polled at a given rate. type Poller interface { IntervalSecs() int64 - OnPoll(service Service) + OnPoll(service Service, client *matrix.Client) } // A Service is the configuration for a bot service.