Browse Source

Flesh out the feedreader service

Just need to send messages into rooms now for a first cut to be done. Notable
improvements to make:
 - We currently do 1 goroutine per service. This could be bad if we have lots of these things running around.
 - We do not cache the response to RSS feeds. If we have 10 independent services on the same feed URL, we will
   hit the URL 10 times. This is similar to how we currently do 1 webhook/service, so it's plausible that in
   the future we will want to have some kind of generic caching layer.
 - We don't send messages to Matrix yet. We need a `Clients` instance but can't get at one. There's only ever
   one, so I wonder if we should global it like we do with `GetServiceDB()` for ease of use?
 - The polling interval is divorced from the actual feed repoll time. Ideally we would schedule the goroutine
   only when we need it, rather than checking frequently, determining we have nothing to do, and going back
   to sleep.
kegan/feedreader
Kegan Dougal 8 years ago
parent
commit
2d6b3cd6e9
  1. 8
      src/github.com/matrix-org/go-neb/goneb.go
  2. 182
      src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go

8
src/github.com/matrix-org/go-neb/goneb.go

@ -45,18 +45,18 @@ func main() {
err := types.BaseURL(baseURL)
if err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to get base url")
}
db, err := database.Open(databaseType, databaseURL)
if err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to open database")
}
database.SetServiceDB(db)
clients := clients.New(db)
if err := clients.Start(); err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to start up clients")
}
http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{}))
@ -73,7 +73,7 @@ func main() {
http.HandleFunc("/realms/redirects/", rh.handle)
if err := polling.Start(); err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to start polling")
}
http.ListenAndServe(bindAddress, nil)

182
src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go

@ -11,59 +11,147 @@ import (
"time"
)
const minPollingIntervalSeconds = (10 * 60) // 10min
type feedPoller struct{}
func (p *feedPoller) IntervalSecs() int64 { return 10 }
func (p *feedPoller) OnPoll(s types.Service) {
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
frService, ok := s.(*feedReaderService)
if !ok {
log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service")
logger.Error("FeedReader: OnPoll called without an Feed Service instance")
return
}
now := time.Now().Unix() // Second resolution
// URL => [ RoomID ]
urlsToRooms := make(map[string][]string)
for roomID, roomInfo := range frService.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now {
// re-query this feed
urlsToRooms[u] = append(urlsToRooms[u], roomID)
// Work out which feeds should be polled
var pollFeeds []string
for u, feedInfo := range frService.Feeds {
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs {
// re-query this feed
pollFeeds = append(pollFeeds, u)
}
}
// Query each feed and send new items to subscribed rooms
for _, u := range pollFeeds {
items, err := p.queryFeed(frService, u)
if err != nil {
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
continue
}
for _, i := range items {
if err := p.sendToRooms(frService, u, i); err != nil {
logger.WithFields(log.Fields{
"feed_url": u,
log.ErrorKey: err,
"item": i,
}).Error("Failed to send item to room")
}
}
}
// TODO: Keep a "next poll ts" value (default 0)
// If ts is 0 or now > ts, then poll and work out next poll ts.
// Worked out by looking at the chosen interval period (prioritise the feed retry time where it exists)
// Persist the next poll ts to the database.
// Persist the service to save the next poll times if we did some queries
if len(pollFeeds) == 0 {
return
}
if _, err := database.GetServiceDB().StoreService(frService); err != nil {
logger.WithError(err).Error("Failed to persist next poll times for service")
}
}
for u := range urlsToRooms {
fp := gofeed.NewParser()
feed, err := fp.ParseURL(u)
if err != nil {
log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"url": u,
log.ErrorKey: err,
}).Error("Failed to parse feed")
// Query the given feed, update relevant timestamps and return NEW items
func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) {
var items []gofeed.Item
fp := gofeed.NewParser()
feed, err := fp.ParseURL(feedURL)
if err != nil {
return items, err
}
// Work out which items are new, if any (based on the last updated TS we have)
// If the TS is 0 then this is the first ever poll, so let's not send 10s of events
// into the room and just do new ones from this point onwards.
// if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 {
for _, i := range feed.Items {
if i == nil || i.PublishedParsed == nil {
continue
}
log.Print(feed)
if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs {
items = append(items, *i)
}
}
// }
if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 {
log.Debug("ts is 0")
}
now := time.Now().Unix() // Second resolution
// Work out when this feed was last updated
var feedLastUpdatedTs int64
if feed.UpdatedParsed != nil {
feedLastUpdatedTs = feed.UpdatedParsed.Unix()
} else if len(feed.Items) > 0 {
i := feed.Items[0]
if i != nil && i.PublishedParsed != nil {
feedLastUpdatedTs = i.PublishedParsed.Unix()
}
}
// Work out when to next poll this feed
nextPollTsSec := now + minPollingIntervalSeconds
if s.Feeds[feedURL].PollIntervalMins > 10 {
nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60)
}
// TODO: Handle the 'sy' Syndication extension to control update interval.
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs)
return items, nil
}
func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) {
for u := range s.Feeds {
if u != feedURL {
continue
}
f := s.Feeds[u]
f.NextPollTimestampSecs = nextPollTs
f.FeedUpdatedTimestampSecs = feedUpdatedTs
s.Feeds[u] = f
}
}
func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error {
log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item")
var rooms []string
for roomID, urls := range s.Rooms {
for _, u := range urls {
if u == feedURL {
rooms = append(rooms, roomID)
break
}
}
}
return nil
}
type feedReaderService struct {
types.DefaultService
id string
serviceUserID string
Rooms map[string]struct { // room_id => {}
Feeds map[string]struct { // URL => { }
PollIntervalMins int `json:"poll_interval_mins"`
LastPollTimestampSecs int64
}
}
Feeds map[string]struct { // feed_url => { }
PollIntervalMins int `json:"poll_interval_mins"`
NextPollTimestampSecs int64 // Internal: When we should poll again
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
} `json:"feeds"`
Rooms map[string][]string `json:"rooms"` // room_id => [ feed_url ]
}
func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID }
@ -73,11 +161,16 @@ func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error {
feeds := feedUrls(s)
if len(feeds) == 0 {
if len(s.Feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
oldFeeds := feedUrls(oldService)
if len(oldFeeds) == 0 {
var numOldFeeds int
oldFeedService, ok := oldService.(*feedReaderService)
if !ok {
log.WithField("service_id", oldService.ServiceID()).Error("Old service isn't a FeedReaderService")
} else {
numOldFeeds = len(oldFeedService.Feeds)
}
if numOldFeeds == 0 {
return errors.New("An RSS feed must be specified.")
}
}
@ -85,7 +178,7 @@ func (s *feedReaderService) Register(oldService types.Service, client *matrix.Cl
}
func (s *feedReaderService) PostRegister(oldService types.Service) {
if len(feedUrls(s)) == 0 { // bye-bye :(
if len(s.Feeds) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
@ -98,27 +191,6 @@ func (s *feedReaderService) PostRegister(oldService types.Service) {
}
}
// feedUrls returns a list of feed urls for this service
func feedUrls(srv types.Service) []string {
var feeds []string
s, ok := srv.(*feedReaderService)
if !ok {
return feeds
}
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u := range roomInfo.Feeds {
urlSet[u] = true
}
}
for u := range urlSet {
feeds = append(feeds, u)
}
return feeds
}
func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &feedReaderService{

Loading…
Cancel
Save