|
@ -15,25 +15,75 @@ import ( |
|
|
|
|
|
|
|
|
const minPollingIntervalSeconds = 60 // 1 min (News feeds can be genuinely spammy)
|
|
|
const minPollingIntervalSeconds = 60 // 1 min (News feeds can be genuinely spammy)
|
|
|
|
|
|
|
|
|
type feedPoller struct{} |
|
|
|
|
|
|
|
|
type rssBotService struct { |
|
|
|
|
|
types.DefaultService |
|
|
|
|
|
id string |
|
|
|
|
|
serviceUserID string |
|
|
|
|
|
Feeds map[string]struct { // feed_url => { }
|
|
|
|
|
|
PollIntervalMins int `json:"poll_interval_mins"` |
|
|
|
|
|
Rooms []string `json:"rooms"` |
|
|
|
|
|
NextPollTimestampSecs int64 // Internal: When we should poll again
|
|
|
|
|
|
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
|
|
|
|
|
|
} `json:"feeds"` |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *rssBotService) ServiceUserID() string { return s.serviceUserID } |
|
|
|
|
|
func (s *rssBotService) ServiceID() string { return s.id } |
|
|
|
|
|
func (s *rssBotService) ServiceType() string { return "rssbot" } |
|
|
|
|
|
|
|
|
|
|
|
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
|
|
|
|
|
|
func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error { |
|
|
|
|
|
if len(s.Feeds) == 0 { |
|
|
|
|
|
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
|
|
|
|
|
|
var numOldFeeds int |
|
|
|
|
|
oldFeedService, ok := oldService.(*rssBotService) |
|
|
|
|
|
if !ok { |
|
|
|
|
|
log.WithField("service", oldService).Error("Old service isn't a rssBotService") |
|
|
|
|
|
} else { |
|
|
|
|
|
numOldFeeds = len(oldFeedService.Feeds) |
|
|
|
|
|
} |
|
|
|
|
|
if numOldFeeds == 0 { |
|
|
|
|
|
return errors.New("An RSS feed must be specified.") |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
// Make sure we can parse the feed
|
|
|
|
|
|
for feedURL, feedInfo := range s.Feeds { |
|
|
|
|
|
fp := gofeed.NewParser() |
|
|
|
|
|
if _, err := fp.ParseURL(feedURL); err != nil { |
|
|
|
|
|
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) |
|
|
|
|
|
} |
|
|
|
|
|
if len(feedInfo.Rooms) == 0 { |
|
|
|
|
|
return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (p *feedPoller) IntervalSecs() int64 { return 10 } |
|
|
|
|
|
func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { |
|
|
|
|
|
|
|
|
func (s *rssBotService) PostRegister(oldService types.Service) { |
|
|
|
|
|
if len(s.Feeds) == 0 { // bye-bye :(
|
|
|
logger := log.WithFields(log.Fields{ |
|
|
logger := log.WithFields(log.Fields{ |
|
|
"service_id": s.ServiceID(), |
|
|
"service_id": s.ServiceID(), |
|
|
"service_type": s.ServiceType(), |
|
|
"service_type": s.ServiceType(), |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
frService, ok := s.(*rssBotService) |
|
|
|
|
|
if !ok { |
|
|
|
|
|
logger.Error("RSSBot: OnPoll called without a Feed Service instance") |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
logger.Info("Deleting service: No feeds remaining.") |
|
|
|
|
|
polling.StopPolling(s) |
|
|
|
|
|
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { |
|
|
|
|
|
logger.WithError(err).Error("Failed to delete service") |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *rssBotService) OnPoll(cli *matrix.Client) time.Time { |
|
|
|
|
|
logger := log.WithFields(log.Fields{ |
|
|
|
|
|
"service_id": s.ServiceID(), |
|
|
|
|
|
"service_type": s.ServiceType(), |
|
|
|
|
|
}) |
|
|
now := time.Now().Unix() // Second resolution
|
|
|
now := time.Now().Unix() // Second resolution
|
|
|
|
|
|
|
|
|
// Work out which feeds should be polled
|
|
|
// Work out which feeds should be polled
|
|
|
var pollFeeds []string |
|
|
var pollFeeds []string |
|
|
for u, feedInfo := range frService.Feeds { |
|
|
|
|
|
|
|
|
for u, feedInfo := range s.Feeds { |
|
|
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { |
|
|
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { |
|
|
// re-query this feed
|
|
|
// re-query this feed
|
|
|
pollFeeds = append(pollFeeds, u) |
|
|
pollFeeds = append(pollFeeds, u) |
|
@ -41,12 +91,12 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if len(pollFeeds) == 0 { |
|
|
if len(pollFeeds) == 0 { |
|
|
return |
|
|
|
|
|
|
|
|
return s.nextTimestamp() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Query each feed and send new items to subscribed rooms
|
|
|
// Query each feed and send new items to subscribed rooms
|
|
|
for _, u := range pollFeeds { |
|
|
for _, u := range pollFeeds { |
|
|
feed, items, err := p.queryFeed(frService, u) |
|
|
|
|
|
|
|
|
feed, items, err := s.queryFeed(u) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") |
|
|
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") |
|
|
continue |
|
|
continue |
|
@ -54,7 +104,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { |
|
|
// Loop backwards since [0] is the most recent and we want to send in chronological order
|
|
|
// Loop backwards since [0] is the most recent and we want to send in chronological order
|
|
|
for i := len(items) - 1; i >= 0; i-- { |
|
|
for i := len(items) - 1; i >= 0; i-- { |
|
|
item := items[i] |
|
|
item := items[i] |
|
|
if err := p.sendToRooms(frService, cli, u, feed, item); err != nil { |
|
|
|
|
|
|
|
|
if err := s.sendToRooms(cli, u, feed, item); err != nil { |
|
|
logger.WithFields(log.Fields{ |
|
|
logger.WithFields(log.Fields{ |
|
|
"feed_url": u, |
|
|
"feed_url": u, |
|
|
log.ErrorKey: err, |
|
|
log.ErrorKey: err, |
|
@ -65,13 +115,26 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Persist the service to save the next poll times
|
|
|
// Persist the service to save the next poll times
|
|
|
if _, err := database.GetServiceDB().StoreService(frService); err != nil { |
|
|
|
|
|
|
|
|
if _, err := database.GetServiceDB().StoreService(s); err != nil { |
|
|
logger.WithError(err).Error("Failed to persist next poll times for service") |
|
|
logger.WithError(err).Error("Failed to persist next poll times for service") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return s.nextTimestamp() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *rssBotService) nextTimestamp() time.Time { |
|
|
|
|
|
// return the earliest next poll ts
|
|
|
|
|
|
var earliestNextTs int64 |
|
|
|
|
|
for _, feedInfo := range s.Feeds { |
|
|
|
|
|
if earliestNextTs == 0 || feedInfo.NextPollTimestampSecs < earliestNextTs { |
|
|
|
|
|
earliestNextTs = feedInfo.NextPollTimestampSecs |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return time.Unix(earliestNextTs, 0) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Query the given feed, update relevant timestamps and return NEW items
|
|
|
// Query the given feed, update relevant timestamps and return NEW items
|
|
|
func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) { |
|
|
|
|
|
|
|
|
func (s *rssBotService) queryFeed(feedURL string) (*gofeed.Feed, []gofeed.Item, error) { |
|
|
log.WithField("feed_url", feedURL).Info("Querying feed") |
|
|
log.WithField("feed_url", feedURL).Info("Querying feed") |
|
|
var items []gofeed.Item |
|
|
var items []gofeed.Item |
|
|
fp := gofeed.NewParser() |
|
|
fp := gofeed.NewParser() |
|
@ -115,11 +178,11 @@ func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed, |
|
|
// TODO: Handle the 'sy' Syndication extension to control update interval.
|
|
|
// TODO: Handle the 'sy' Syndication extension to control update interval.
|
|
|
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
|
|
|
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
|
|
|
|
|
|
|
|
|
p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs) |
|
|
|
|
|
|
|
|
s.updateFeedInfo(feedURL, nextPollTsSec, feedLastUpdatedTs) |
|
|
return feed, items, nil |
|
|
return feed, items, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs, feedUpdatedTs int64) { |
|
|
|
|
|
|
|
|
func (s *rssBotService) updateFeedInfo(feedURL string, nextPollTs, feedUpdatedTs int64) { |
|
|
for u := range s.Feeds { |
|
|
for u := range s.Feeds { |
|
|
if u != feedURL { |
|
|
if u != feedURL { |
|
|
continue |
|
|
continue |
|
@ -131,7 +194,7 @@ func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (p *feedPoller) sendToRooms(s *rssBotService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { |
|
|
|
|
|
|
|
|
func (s *rssBotService) sendToRooms(cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { |
|
|
logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) |
|
|
logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) |
|
|
logger.Info("New feed item") |
|
|
logger.Info("New feed item") |
|
|
for _, roomID := range s.Feeds[feedURL].Rooms { |
|
|
for _, roomID := range s.Feeds[feedURL].Rooms { |
|
@ -150,66 +213,6 @@ func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { |
|
|
)) |
|
|
)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type rssBotService struct { |
|
|
|
|
|
types.DefaultService |
|
|
|
|
|
id string |
|
|
|
|
|
serviceUserID string |
|
|
|
|
|
Feeds map[string]struct { // feed_url => { }
|
|
|
|
|
|
PollIntervalMins int `json:"poll_interval_mins"` |
|
|
|
|
|
Rooms []string `json:"rooms"` |
|
|
|
|
|
NextPollTimestampSecs int64 // Internal: When we should poll again
|
|
|
|
|
|
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
|
|
|
|
|
|
} `json:"feeds"` |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *rssBotService) ServiceUserID() string { return s.serviceUserID } |
|
|
|
|
|
func (s *rssBotService) ServiceID() string { return s.id } |
|
|
|
|
|
func (s *rssBotService) ServiceType() string { return "rssbot" } |
|
|
|
|
|
func (s *rssBotService) Poller() types.Poller { return &feedPoller{} } |
|
|
|
|
|
|
|
|
|
|
|
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
|
|
|
|
|
|
func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error { |
|
|
|
|
|
if len(s.Feeds) == 0 { |
|
|
|
|
|
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
|
|
|
|
|
|
var numOldFeeds int |
|
|
|
|
|
oldFeedService, ok := oldService.(*rssBotService) |
|
|
|
|
|
if !ok { |
|
|
|
|
|
log.WithField("service", oldService).Error("Old service isn't a rssBotService") |
|
|
|
|
|
} else { |
|
|
|
|
|
numOldFeeds = len(oldFeedService.Feeds) |
|
|
|
|
|
} |
|
|
|
|
|
if numOldFeeds == 0 { |
|
|
|
|
|
return errors.New("An RSS feed must be specified.") |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
// Make sure we can parse the feed
|
|
|
|
|
|
for feedURL, feedInfo := range s.Feeds { |
|
|
|
|
|
fp := gofeed.NewParser() |
|
|
|
|
|
if _, err := fp.ParseURL(feedURL); err != nil { |
|
|
|
|
|
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error()) |
|
|
|
|
|
} |
|
|
|
|
|
if len(feedInfo.Rooms) == 0 { |
|
|
|
|
|
return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *rssBotService) PostRegister(oldService types.Service) { |
|
|
|
|
|
if len(s.Feeds) == 0 { // bye-bye :(
|
|
|
|
|
|
logger := log.WithFields(log.Fields{ |
|
|
|
|
|
"service_id": s.ServiceID(), |
|
|
|
|
|
"service_type": s.ServiceType(), |
|
|
|
|
|
}) |
|
|
|
|
|
logger.Info("Deleting service: No feeds remaining.") |
|
|
|
|
|
polling.StopPolling(s) |
|
|
|
|
|
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { |
|
|
|
|
|
logger.WithError(err).Error("Failed to delete service") |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func init() { |
|
|
func init() { |
|
|
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { |
|
|
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { |
|
|
r := &rssBotService{ |
|
|
r := &rssBotService{ |
|
|