Browse Source

First working feed reader version

kegan/feedreader
Kegan Dougal 8 years ago
parent
commit
0b09fcd86d
  1. 1
      src/github.com/matrix-org/go-neb/goneb.go
  2. 13
      src/github.com/matrix-org/go-neb/polling/polling.go
  3. 51
      src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go
  4. 2
      src/github.com/matrix-org/go-neb/types/types.go

1
src/github.com/matrix-org/go-neb/goneb.go

@ -72,6 +72,7 @@ func main() {
rh := &realmRedirectHandler{db: db}
http.HandleFunc("/realms/redirects/", rh.handle)
polling.SetClients(clients)
if err := polling.Start(); err != nil {
log.WithError(err).Panic("Failed to start polling")
}

13
src/github.com/matrix-org/go-neb/polling/polling.go

@ -3,6 +3,7 @@ package polling
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/types"
"sync"
@ -16,6 +17,11 @@ var (
pollMutex sync.Mutex
startPollTime = make(map[string]int64) // ServiceID => unix timestamp
)
var clientPool *clients.Clients
func SetClients(clis *clients.Clients) {
clientPool = clis
}
// Start polling already existing services
func Start() error {
@ -74,8 +80,13 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) {
"interval_secs": poller.IntervalSecs(),
})
logger.Info("Starting polling loop")
cli, err := clientPool.Client(service.ServiceUserID())
if err != nil {
logger.WithError(err).WithField("user_id", service.ServiceUserID()).Error("Poll setup failed: failed to load client")
return
}
for {
poller.OnPoll(service)
poller.OnPoll(service, cli)
if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.")
break

51
src/github.com/matrix-org/go-neb/services/feedreader/feedreader.go

@ -2,12 +2,14 @@ package services
import (
"errors"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types"
"github.com/mmcdole/gofeed"
"html"
"time"
)
@ -16,7 +18,7 @@ const minPollingIntervalSeconds = (10 * 60) // 10min
type feedPoller struct{}
func (p *feedPoller) IntervalSecs() int64 { return 10 }
func (p *feedPoller) OnPoll(s types.Service) {
func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
@ -40,13 +42,13 @@ func (p *feedPoller) OnPoll(s types.Service) {
// Query each feed and send new items to subscribed rooms
for _, u := range pollFeeds {
items, err := p.queryFeed(frService, u)
feed, items, err := p.queryFeed(frService, u)
if err != nil {
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
continue
}
for _, i := range items {
if err := p.sendToRooms(frService, u, i); err != nil {
if err := p.sendToRooms(frService, cli, u, feed, i); err != nil {
logger.WithFields(log.Fields{
"feed_url": u,
log.ErrorKey: err,
@ -66,30 +68,27 @@ func (p *feedPoller) OnPoll(s types.Service) {
}
// Query the given feed, update relevant timestamps and return NEW items
func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.Item, error) {
func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
var items []gofeed.Item
fp := gofeed.NewParser()
feed, err := fp.ParseURL(feedURL)
if err != nil {
return items, err
return nil, items, err
}
// Work out which items are new, if any (based on the last updated TS we have)
// If the TS is 0 then this is the first ever poll, so let's not send 10s of events
// into the room and just do new ones from this point onwards.
// if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 {
for _, i := range feed.Items {
if i == nil || i.PublishedParsed == nil {
continue
}
if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs {
items = append(items, *i)
if s.Feeds[feedURL].FeedUpdatedTimestampSecs != 0 {
for _, i := range feed.Items {
if i == nil || i.PublishedParsed == nil {
continue
}
if i.PublishedParsed.Unix() > s.Feeds[feedURL].FeedUpdatedTimestampSecs {
items = append(items, *i)
}
}
}
// }
if s.Feeds[feedURL].FeedUpdatedTimestampSecs == 0 {
log.Debug("ts is 0")
}
now := time.Now().Unix() // Second resolution
@ -113,7 +112,7 @@ func (p *feedPoller) queryFeed(s *feedReaderService, feedURL string) ([]gofeed.I
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs)
return items, nil
return feed, items, nil
}
func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPollTs, feedUpdatedTs int64) {
@ -128,8 +127,9 @@ func (p *feedPoller) updateFeedInfo(s *feedReaderService, feedURL string, nextPo
}
}
func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofeed.Item) error {
log.WithField("feed_url", feedURL).WithField("title", item.Title).Info("New feed item")
func (p *feedPoller) sendToRooms(s *feedReaderService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
logger := log.WithField("feed_url", feedURL).WithField("title", item.Title)
logger.Info("New feed item")
var rooms []string
for roomID, urls := range s.Rooms {
for _, u := range urls {
@ -139,9 +139,22 @@ func (p *feedPoller) sendToRooms(s *feedReaderService, feedURL string, item gofe
}
}
}
for _, roomID := range rooms {
if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil {
logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room")
}
}
return nil
}
// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag )
func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage {
return matrix.GetHTMLMessage("m.notice", fmt.Sprintf(
"<i>%s</i> posted a new article: %s ( %s )",
html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link),
))
}
type feedReaderService struct {
types.DefaultService
id string

2
src/github.com/matrix-org/go-neb/types/types.go

@ -43,7 +43,7 @@ type BotOptions struct {
// Poller represents a thing that can be polled at a given rate.
type Poller interface {
IntervalSecs() int64
OnPoll(service Service)
OnPoll(service Service, client *matrix.Client)
}
// A Service is the configuration for a bot service.

Loading…
Cancel
Save