diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index a715aae..8fe0633 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -10,6 +10,7 @@ import ( _ "github.com/matrix-org/go-neb/realms/jira" "github.com/matrix-org/go-neb/server" _ "github.com/matrix-org/go-neb/services/echo" + _ "github.com/matrix-org/go-neb/services/feedreader" _ "github.com/matrix-org/go-neb/services/giphy" _ "github.com/matrix-org/go-neb/services/github" _ "github.com/matrix-org/go-neb/services/guggy" @@ -44,18 +45,18 @@ func main() { err := types.BaseURL(baseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to get base url") } db, err := database.Open(databaseType, databaseURL) if err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to open database") } database.SetServiceDB(db) clients := clients.New(db) if err := clients.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start up clients") } http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{})) @@ -71,8 +72,9 @@ func main() { rh := &realmRedirectHandler{db: db} http.HandleFunc("/realms/redirects/", rh.handle) + polling.SetClients(clients) if err := polling.Start(); err != nil { - log.Panic(err) + log.WithError(err).Panic("Failed to start polling") } http.ListenAndServe(bindAddress, nil) diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go index 7f0c49e..26f9094 100644 --- a/src/github.com/matrix-org/go-neb/polling/polling.go +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -3,6 +3,7 @@ package polling import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/types" "sync" @@ -16,6 +17,12 @@ var ( pollMutex sync.Mutex startPollTime = make(map[string]int64) // ServiceID => unix timestamp ) +var clientPool *clients.Clients + +// SetClients sets a pool of clients for passing into OnPoll +func SetClients(clis *clients.Clients) { + clientPool = clis +} // Start polling already existing services func Start() error { @@ -74,8 +81,13 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) { "interval_secs": poller.IntervalSecs(), }) logger.Info("Starting polling loop") + cli, err := clientPool.Client(service.ServiceUserID()) + if err != nil { + logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client") + return + } for { - poller.OnPoll(service) + poller.OnPoll(service, cli) if pollTimeChanged(service, ts) { logger.Info("Terminating poll.") break diff --git a/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go new file mode 100644 index 0000000..0819ce2 --- /dev/null +++ b/src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go @@ -0,0 +1,236 @@ +package services + +import ( + "errors" + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/go-neb/matrix" + "github.com/matrix-org/go-neb/polling" + "github.com/matrix-org/go-neb/types" + "github.com/mmcdole/gofeed" + "html" + "time" +) + +const minPollingIntervalSeconds = (10 * 60) // 10min + +type feedPoller struct{} + +func (p *feedPoller) IntervalSecs() int64 { return 10 } +func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + + frService, ok := s.(*feedReaderService) + if !ok { + logger.Error("FeedReader: OnPoll called without a Feed Service instance") + return + } + now := time.Now().Unix() // Second resolution + + // Work out which feeds should be polled + var pollFeeds []string + for u, feedInfo := range frService.Feeds { + if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { + // re-query this feed + pollFeeds = append(pollFeeds, u) + } + } + + if len(pollFeeds) == 0 { + return + } + + // Query each feed and send new items to subscribed rooms + for _, u := range pollFeeds { + feed, items, err := p.queryFeed(frService, u) + if err != nil { + logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") + continue + } + // Loop backwards since [0] is the most recent and we want to send in chronological order + for i := len(items) - 1; i >= 0; i-- { + item := items[i] + if err := p.sendToRooms(frService, cli, u, feed, item); err != nil { + logger.WithFields(log.Fields{ + "feed_url": u, + log.ErrorKey: err, + "item": item, + }).Error("Failed to send item to room") + } + } + } + + // Persist the service to save the next poll times + if _, err := database.GetServiceDB().StoreService(frService); err != nil { + logger.WithError(err).Error("Failed to persist next poll times for service") + } +} + +// Query the given feed, update relevant timestamps and return NEW items +func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { + log.WithField("feed_url", feedURL).Info("Querying feed") + var items []gofeed.Item + fp := gofeed.NewParser() + feed, err := fp.ParseURL(feedURL) + if err != nil { + return nil, items, err + } + + // Work out which items are new, if any (based on the last updated TS we have) + // If the TS is 0 then this is the first ever poll, so let's not send 10s of events + // into the room and just do new ones from this point onwards. + if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 { + for _, i := range feed.Items { + if i == nil || i.PublishedParsed == nil { + continue + } + if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs { + items = append(items, *i) + } + } + } + + now := time.Now().Unix() // Second resolution + + // Work out when this feed was last updated + var feedLastUpdatedTs int64 + if feed.UpdatedParsed != nil { + feedLastUpdatedTs = feed.UpdatedParsed.Unix() + } else if len(feed.Items) > 0 { + i := feed.Items[0] + if i != nil && i.PublishedParsed != nil { + feedLastUpdatedTs = i.PublishedParsed.Unix() + } + } + + // Work out when to next poll this feed + nextPollTsSec := now + minPollingIntervalSeconds + if s.Feeds[feedURL].PollIntervalMins > int(minPollingIntervalSeconds/60) { + nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60) + } + // TODO: Handle the 'sy' Syndication extension to control update interval. + // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ + + p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) + return feed, items, nil +} + +func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) { + for u := range s.Feeds { + if u != feedURL { + continue + } + f := s.Feeds[u] + f.NextPollTimestampSecs = nextPollTs + f.FeedUpdatedTimestampSecs = feedUpdatedTs + s.Feeds[u] = f + } +} + +func (p *feedPoller) sendToRooms(s *feedReaderService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { + logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) + logger.Info("New feed item") + var rooms []string + for roomID, urls := range s.Rooms { + for _, u := range urls { + if u == feedURL { + rooms = append(rooms, roomID) + break + } + } + } + for _, roomID := range rooms { + if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil { + logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room") + } + } + return nil +} + +// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag ) +func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { + return matrix.GetHTMLMessage("m.notice", fmt.Sprintf( + "%s posted a new article: %s ( %s )", + html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link), + )) +} + +type feedReaderService struct { + types.DefaultService + id string + serviceUserID string + Feeds map[string]struct { // feed_url => { } + PollIntervalMins int `json:"poll_interval_mins"` + NextPollTimestampSecs int64 // Internal: When we should poll again + FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated + } `json:"feeds"` + Rooms map[string][]string `json:"rooms"` // room_id => [ feed_url ] +} + +func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID } +func (s *feedReaderService) ServiceID() string { return s.id } +func (s *feedReaderService) ServiceType() string { return "feedreader" } +func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} } + +// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. +func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error { + if len(s.Feeds) == 0 { + // this is an error UNLESS the old service had some feeds in which case they are deleting us :( + var numOldFeeds int + oldFeedService, ok := oldService.(*feedReaderService) + if !ok { + log.WithField("service_id", oldService.ServiceID()).Error("Old service isn't a FeedReaderService") + } else { + numOldFeeds = len(oldFeedService.Feeds) + } + if numOldFeeds == 0 { + return errors.New("An RSS feed must be specified.") + } + return nil + } + // Make sure we can parse the feed + for feedURL := range s.Feeds { + fp := gofeed.NewParser() + if _, err := fp.ParseURL(feedURL); err != nil { + return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) + } + } + // Make sure all feeds are accounted for (appear at least once) in the room map, AND make sure there + // are no weird new feeds in those rooms + for roomID, roomFeeds := range s.Rooms { + for _, f := range roomFeeds { + if _, exists := s.Feeds[f]; !exists { + return fmt.Errorf("Feed URL %s in room %s does not exist in the Feeds section", f, roomID) + } + } + } + return nil +} + +func (s *feedReaderService) PostRegister(oldService types.Service) { + if len(s.Feeds) == 0 { // bye-bye :( + logger := log.WithFields(log.Fields{ + "service_id": s.ServiceID(), + "service_type": s.ServiceType(), + }) + logger.Info("Deleting service: No feeds remaining.") + polling.StopPolling(s) + if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { + logger.WithError(err).Error("Failed to delete service") + } + } +} + +func init() { + types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { + r := &feedReaderService{ + id: serviceID, + serviceUserID: serviceUserID, + } + return r + }) +} diff --git a/src/github.com/matrix-org/go-neb/services/rss/rss.go b/src/github.com/matrix-org/go-neb/services/rss/rss.go deleted file mode 100644 index 25e36fa..0000000 --- a/src/github.com/matrix-org/go-neb/services/rss/rss.go +++ /dev/null @@ -1,111 +0,0 @@ -package services - -import ( - "errors" - log "github.com/Sirupsen/logrus" - "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" - "github.com/matrix-org/go-neb/polling" - "github.com/matrix-org/go-neb/types" - "time" -) - -type rssPoller struct{} - -func (p *rssPoller) IntervalSecs() int64 { return 10 } -func (p *rssPoller) OnPoll(s types.Service) { - rsss, ok := s.(*rssService) - if !ok { - log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service") - return - } - now := time.Now().Unix() // Second resolution - // URL => [ RoomID ] - urlsToRooms := make(map[string][]string) - - for roomID, roomInfo := range rsss.Rooms { - for u, feedInfo := range roomInfo.Feeds { - if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now { - // re-query this feed - urlsToRooms[u] = append(urlsToRooms[u], roomID) - } - } - } - - // TODO: Some polling -} - -type rssService struct { - types.DefaultService - id string - serviceUserID string - Rooms map[string]struct { // room_id => {} - Feeds map[string]struct { // URL => { } - PollIntervalMins int `json:"poll_interval_mins"` - LastPollTimestampSecs int64 - } - } -} - -func (s *rssService) ServiceUserID() string { return s.serviceUserID } -func (s *rssService) ServiceID() string { return s.id } -func (s *rssService) ServiceType() string { return "rss" } -func (s *rssService) Poller() types.Poller { return &rssPoller{} } - -// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. -func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { - feeds := feedUrls(s) - if len(feeds) == 0 { - // this is an error UNLESS the old service had some feeds in which case they are deleting us :( - oldFeeds := feedUrls(oldService) - if len(oldFeeds) == 0 { - return errors.New("An RSS feed must be specified.") - } - } - return nil -} - -func (s *rssService) PostRegister(oldService types.Service) { - if len(feedUrls(s)) == 0 { // bye-bye :( - logger := log.WithFields(log.Fields{ - "service_id": s.ServiceID(), - "service_type": s.ServiceType(), - }) - logger.Info("Deleting service (0 feeds)") - polling.StopPolling(s) - if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { - logger.WithError(err).Error("Failed to delete service") - } - } -} - -// feedUrls returns a list of feed urls for this service -func feedUrls(srv types.Service) []string { - var feeds []string - s, ok := srv.(*rssService) - if !ok { - return feeds - } - - urlSet := make(map[string]bool) - for _, roomInfo := range s.Rooms { - for u := range roomInfo.Feeds { - urlSet[u] = true - } - } - - for u := range urlSet { - feeds = append(feeds, u) - } - return feeds -} - -func init() { - types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { - r := &rssService{ - id: serviceID, - serviceUserID: serviceUserID, - } - return r - }) -} diff --git a/src/github.com/matrix-org/go-neb/types/types.go b/src/github.com/matrix-org/go-neb/types/types.go index a2cfea3..1f0c0d4 100644 --- a/src/github.com/matrix-org/go-neb/types/types.go +++ b/src/github.com/matrix-org/go-neb/types/types.go @@ -43,7 +43,7 @@ type BotOptions struct { // Poller represents a thing that can be polled at a given rate. type Poller interface { IntervalSecs() int64 - OnPoll(service Service) + OnPoll(service Service, client *matrix.Client) } // A Service is the configuration for a bot service.