Browse Source

Merge pull request #78 from matrix-org/kegan/feedreader

Implement feedreader service
pull/68/merge
Kegsay 8 years ago
committed by GitHub
parent
commit
7971b13368
  1. 10
      src/github.com/matrix-org/go-neb/goneb.go
  2. 14
      src/github.com/matrix-org/go-neb/polling/polling.go
  3. 236
      src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go
  4. 111
      src/github.com/matrix-org/go-neb/services/rss/rss.go
  5. 2
      src/github.com/matrix-org/go-neb/types/types.go

10
src/github.com/matrix-org/go-neb/goneb.go

@ -10,6 +10,7 @@ import (
_ "github.com/matrix-org/go-neb/realms/jira" _ "github.com/matrix-org/go-neb/realms/jira"
"github.com/matrix-org/go-neb/server" "github.com/matrix-org/go-neb/server"
_ "github.com/matrix-org/go-neb/services/echo" _ "github.com/matrix-org/go-neb/services/echo"
_ "github.com/matrix-org/go-neb/services/feedreader"
_ "github.com/matrix-org/go-neb/services/giphy" _ "github.com/matrix-org/go-neb/services/giphy"
_ "github.com/matrix-org/go-neb/services/github" _ "github.com/matrix-org/go-neb/services/github"
_ "github.com/matrix-org/go-neb/services/guggy" _ "github.com/matrix-org/go-neb/services/guggy"
@ -44,18 +45,18 @@ func main() {
err := types.BaseURL(baseURL) err := types.BaseURL(baseURL)
if err != nil { if err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to get base url")
} }
db, err := database.Open(databaseType, databaseURL) db, err := database.Open(databaseType, databaseURL)
if err != nil { if err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to open database")
} }
database.SetServiceDB(db) database.SetServiceDB(db)
clients := clients.New(db) clients := clients.New(db)
if err := clients.Start(); err != nil { if err := clients.Start(); err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to start up clients")
} }
http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{})) http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{}))
@ -71,8 +72,9 @@ func main() {
rh := &realmRedirectHandler{db: db} rh := &realmRedirectHandler{db: db}
http.HandleFunc("/realms/redirects/", rh.handle) http.HandleFunc("/realms/redirects/", rh.handle)
polling.SetClients(clients)
if err := polling.Start(); err != nil { if err := polling.Start(); err != nil {
log.Panic(err)
log.WithError(err).Panic("Failed to start polling")
} }
http.ListenAndServe(bindAddress, nil) http.ListenAndServe(bindAddress, nil)

14
src/github.com/matrix-org/go-neb/polling/polling.go

@ -3,6 +3,7 @@ package polling
import ( import (
"fmt" "fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"sync" "sync"
@ -16,6 +17,12 @@ var (
pollMutex sync.Mutex pollMutex sync.Mutex
startPollTime = make(map[string]int64) // ServiceID => unix timestamp startPollTime = make(map[string]int64) // ServiceID => unix timestamp
) )
var clientPool *clients.Clients
// SetClients sets a pool of clients for passing into OnPoll
func SetClients(clis *clients.Clients) {
clientPool = clis
}
// Start polling already existing services // Start polling already existing services
func Start() error { func Start() error {
@ -74,8 +81,13 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) {
"interval_secs": poller.IntervalSecs(), "interval_secs": poller.IntervalSecs(),
}) })
logger.Info("Starting polling loop") logger.Info("Starting polling loop")
cli, err := clientPool.Client(service.ServiceUserID())
if err != nil {
logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client")
return
}
for { for {
poller.OnPoll(service)
poller.OnPoll(service, cli)
if pollTimeChanged(service, ts) { if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.") logger.Info("Terminating poll.")
break break

236
src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go

@ -0,0 +1,236 @@
package services
import (
"errors"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types"
"github.com/mmcdole/gofeed"
"html"
"time"
)
const minPollingIntervalSeconds = (10 * 60) // 10min
type feedPoller struct{}
func (p *feedPoller) IntervalSecs() int64 { return 10 }
func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
frService, ok := s.(*feedReaderService)
if !ok {
logger.Error("FeedReader: OnPoll called without a Feed Service instance")
return
}
now := time.Now().Unix() // Second resolution
// Work out which feeds should be polled
var pollFeeds []string
for u, feedInfo := range frService.Feeds {
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs {
// re-query this feed
pollFeeds = append(pollFeeds, u)
}
}
if len(pollFeeds) == 0 {
return
}
// Query each feed and send new items to subscribed rooms
for _, u := range pollFeeds {
feed, items, err := p.queryFeed(frService, u)
if err != nil {
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
continue
}
// Loop backwards since [0] is the most recent and we want to send in chronological order
for i := len(items) - 1; i >= 0; i-- {
item := items[i]
if err := p.sendToRooms(frService, cli, u, feed, item); err != nil {
logger.WithFields(log.Fields{
"feed_url": u,
log.ErrorKey: err,
"item": item,
}).Error("Failed to send item to room")
}
}
}
// Persist the service to save the next poll times
if _, err := database.GetServiceDB().StoreService(frService); err != nil {
logger.WithError(err).Error("Failed to persist next poll times for service")
}
}
// Query the given feed, update relevant timestamps and return NEW items
func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
log.WithField("feed_url", feedURL).Info("Querying feed")
var items []gofeed.Item
fp := gofeed.NewParser()
feed, err := fp.ParseURL(feedURL)
if err != nil {
return nil, items, err
}
// Work out which items are new, if any (based on the last updated TS we have)
// If the TS is 0 then this is the first ever poll, so let's not send 10s of events
// into the room and just do new ones from this point onwards.
if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 {
for _, i := range feed.Items {
if i == nil || i.PublishedParsed == nil {
continue
}
if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs {
items = append(items, *i)
}
}
}
now := time.Now().Unix() // Second resolution
// Work out when this feed was last updated
var feedLastUpdatedTs int64
if feed.UpdatedParsed != nil {
feedLastUpdatedTs = feed.UpdatedParsed.Unix()
} else if len(feed.Items) > 0 {
i := feed.Items[0]
if i != nil && i.PublishedParsed != nil {
feedLastUpdatedTs = i.PublishedParsed.Unix()
}
}
// Work out when to next poll this feed
nextPollTsSec := now + minPollingIntervalSeconds
if s.Feeds[feedURL].PollIntervalMins > int(minPollingIntervalSeconds/60) {
nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60)
}
// TODO: Handle the 'sy' Syndication extension to control update interval.
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs)
return feed, items, nil
}
func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) {
for u := range s.Feeds {
if u != feedURL {
continue
}
f := s.Feeds[u]
f.NextPollTimestampSecs = nextPollTs
f.FeedUpdatedTimestampSecs = feedUpdatedTs
s.Feeds[u] = f
}
}
func (p *feedPoller) sendToRooms(s *feedReaderService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
logger := log.WithField("feed_url", feedURL).WithField("title", item.Title)
logger.Info("New feed item")
var rooms []string
for roomID, urls := range s.Rooms {
for _, u := range urls {
if u == feedURL {
rooms = append(rooms, roomID)
break
}
}
}
for _, roomID := range rooms {
if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil {
logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room")
}
}
return nil
}
// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag )
func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage {
return matrix.GetHTMLMessage("m.notice", fmt.Sprintf(
"<i>%s</i> posted a new article: %s ( %s )",
html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link),
))
}
type feedReaderService struct {
types.DefaultService
id string
serviceUserID string
Feeds map[string]struct { // feed_url => { }
PollIntervalMins int `json:"poll_interval_mins"`
NextPollTimestampSecs int64 // Internal: When we should poll again
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
} `json:"feeds"`
Rooms map[string][]string `json:"rooms"` // room_id => [ feed_url ]
}
func (s *feedReaderService) ServiceUserID() string { return s.serviceUserID }
func (s *feedReaderService) ServiceID() string { return s.id }
func (s *feedReaderService) ServiceType() string { return "feedreader" }
func (s *feedReaderService) Poller() types.Poller { return &feedPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *feedReaderService) Register(oldService types.Service, client *matrix.Client) error {
if len(s.Feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
var numOldFeeds int
oldFeedService, ok := oldService.(*feedReaderService)
if !ok {
log.WithField("service_id", oldService.ServiceID()).Error("Old service isn't a FeedReaderService")
} else {
numOldFeeds = len(oldFeedService.Feeds)
}
if numOldFeeds == 0 {
return errors.New("An RSS feed must be specified.")
}
return nil
}
// Make sure we can parse the feed
for feedURL := range s.Feeds {
fp := gofeed.NewParser()
if _, err := fp.ParseURL(feedURL); err != nil {
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error())
}
}
// Make sure all feeds are accounted for (appear at least once) in the room map, AND make sure there
// are no weird new feeds in those rooms
for roomID, roomFeeds := range s.Rooms {
for _, f := range roomFeeds {
if _, exists := s.Feeds[f]; !exists {
return fmt.Errorf("Feed URL %s in room %s does not exist in the Feeds section", f, roomID)
}
}
}
return nil
}
func (s *feedReaderService) PostRegister(oldService types.Service) {
if len(s.Feeds) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service: No feeds remaining.")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &feedReaderService{
id: serviceID,
serviceUserID: serviceUserID,
}
return r
})
}

111
src/github.com/matrix-org/go-neb/services/rss/rss.go

@ -1,111 +0,0 @@
package services
import (
"errors"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types"
"time"
)
type rssPoller struct{}
func (p *rssPoller) IntervalSecs() int64 { return 10 }
func (p *rssPoller) OnPoll(s types.Service) {
rsss, ok := s.(*rssService)
if !ok {
log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service")
return
}
now := time.Now().Unix() // Second resolution
// URL => [ RoomID ]
urlsToRooms := make(map[string][]string)
for roomID, roomInfo := range rsss.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now {
// re-query this feed
urlsToRooms[u] = append(urlsToRooms[u], roomID)
}
}
}
// TODO: Some polling
}
type rssService struct {
types.DefaultService
id string
serviceUserID string
Rooms map[string]struct { // room_id => {}
Feeds map[string]struct { // URL => { }
PollIntervalMins int `json:"poll_interval_mins"`
LastPollTimestampSecs int64
}
}
}
func (s *rssService) ServiceUserID() string { return s.serviceUserID }
func (s *rssService) ServiceID() string { return s.id }
func (s *rssService) ServiceType() string { return "rss" }
func (s *rssService) Poller() types.Poller { return &rssPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssService) Register(oldService types.Service, client *matrix.Client) error {
feeds := feedUrls(s)
if len(feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
oldFeeds := feedUrls(oldService)
if len(oldFeeds) == 0 {
return errors.New("An RSS feed must be specified.")
}
}
return nil
}
func (s *rssService) PostRegister(oldService types.Service) {
if len(feedUrls(s)) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service (0 feeds)")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
// feedUrls returns a list of feed urls for this service
func feedUrls(srv types.Service) []string {
var feeds []string
s, ok := srv.(*rssService)
if !ok {
return feeds
}
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u := range roomInfo.Feeds {
urlSet[u] = true
}
}
for u := range urlSet {
feeds = append(feeds, u)
}
return feeds
}
func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &rssService{
id: serviceID,
serviceUserID: serviceUserID,
}
return r
})
}

2
src/github.com/matrix-org/go-neb/types/types.go

@ -43,7 +43,7 @@ type BotOptions struct {
// Poller represents a thing that can be polled at a given rate. // Poller represents a thing that can be polled at a given rate.
type Poller interface { type Poller interface {
IntervalSecs() int64 IntervalSecs() int64
OnPoll(service Service)
OnPoll(service Service, client *matrix.Client)
} }
// A Service is the configuration for a bot service. // A Service is the configuration for a bot service.

Loading…
Cancel
Save