Browse Source

Move OnPoll into the Service method set

This feels a lot better because now `OnPoll` works in a similar way to
`OnReceiveWebhook` (called on a `Service`) rather than have this strange
global-per-service-type struct.
kegan/per-service-poll
Kegan Dougal 8 years ago
parent
commit
fd54eb1a55
  1. 2
      src/github.com/matrix-org/go-neb/api.go
  2. 39
      src/github.com/matrix-org/go-neb/polling/polling.go
  3. 159
      src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go
  4. 32
      src/github.com/matrix-org/go-neb/types/types.go

2
src/github.com/matrix-org/go-neb/api.go

@ -309,7 +309,7 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
// Start any polling NOW because they may decide to stop it in PostRegister, and we want to make // Start any polling NOW because they may decide to stop it in PostRegister, and we want to make
// sure we'll actually stop. // sure we'll actually stop.
if service.Poller() != nil {
if _, ok := service.(types.Poller); ok {
if err := polling.StartPolling(service); err != nil { if err := polling.StartPolling(service); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"service_id": service.ServiceID(), "service_id": service.ServiceID(),

39
src/github.com/matrix-org/go-neb/polling/polling.go

@ -1,7 +1,6 @@
package polling package polling
import ( import (
"fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
@ -27,10 +26,7 @@ func SetClients(clis *clients.Clients) {
// Start polling already existing services // Start polling already existing services
func Start() error { func Start() error {
// Work out which service types require polling // Work out which service types require polling
for serviceType, poller := range types.PollersByType() {
if poller == nil {
continue
}
for _, serviceType := range types.PollingServiceTypes() {
// Query for all services with said service type // Query for all services with said service type
srvs, err := database.GetServiceDB().LoadServicesByType(serviceType) srvs, err := database.GetServiceDB().LoadServicesByType(serviceType)
if err != nil { if err != nil {
@ -50,15 +46,11 @@ func Start() error {
// so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate // so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate
// this poll. // this poll.
func StartPolling(service types.Service) error { func StartPolling(service types.Service) error {
p := types.PollersByType()[service.ServiceType()]
if p == nil {
return fmt.Errorf("Service %s (type=%s) doesn't have a Poller", service.ServiceID(), service.ServiceType())
}
// Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here, // Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here,
// we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended. // we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended.
ts := time.Now().UnixNano() ts := time.Now().UnixNano()
setPollStartTime(service, ts) setPollStartTime(service, ts)
go pollLoop(service, p, ts)
go pollLoop(service, ts)
return nil return nil
} }
@ -73,13 +65,17 @@ func StopPolling(service types.Service) {
// pollLoop begins the polling loop for this service. Does not return, so call this // pollLoop begins the polling loop for this service. Does not return, so call this
// as a goroutine! // as a goroutine!
func pollLoop(service types.Service, poller types.Poller, ts int64) {
func pollLoop(service types.Service, ts int64) {
logger := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{
"timestamp": ts,
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
"interval_secs": poller.IntervalSecs(),
"timestamp": ts,
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
}) })
poller, ok := service.(types.Poller)
if !ok {
logger.Error("Service is not a Poller.")
return
}
logger.Info("Starting polling loop") logger.Info("Starting polling loop")
cli, err := clientPool.Client(service.ServiceUserID()) cli, err := clientPool.Client(service.ServiceUserID())
if err != nil { if err != nil {
@ -87,12 +83,21 @@ func pollLoop(service types.Service, poller types.Poller, ts int64) {
return return
} }
for { for {
poller.OnPoll(service, cli)
logger.Info("OnPoll")
nextTime := poller.OnPoll(cli)
if pollTimeChanged(service, ts) { if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.") logger.Info("Terminating poll.")
break break
} }
time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second)
// work out how long to sleep
if nextTime.Unix() == 0 {
logger.Info("Terminating poll - OnPoll returned 0")
break
}
now := time.Now()
logger.Info("Sleeping for ", nextTime.Sub(now))
time.Sleep(nextTime.Sub(now))
if pollTimeChanged(service, ts) { if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.") logger.Info("Terminating poll.")
break break

159
src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go

@ -15,25 +15,75 @@ import (
const minPollingIntervalSeconds = 60 // 1 min (News feeds can be genuinely spammy) const minPollingIntervalSeconds = 60 // 1 min (News feeds can be genuinely spammy)
type feedPoller struct{}
type rssBotService struct {
types.DefaultService
id string
serviceUserID string
Feeds map[string]struct { // feed_url => { }
PollIntervalMins int `json:"poll_interval_mins"`
Rooms []string `json:"rooms"`
NextPollTimestampSecs int64 // Internal: When we should poll again
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
} `json:"feeds"`
}
func (s *rssBotService) ServiceUserID() string { return s.serviceUserID }
func (s *rssBotService) ServiceID() string { return s.id }
func (s *rssBotService) ServiceType() string { return "rssbot" }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error {
if len(s.Feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
var numOldFeeds int
oldFeedService, ok := oldService.(*rssBotService)
if !ok {
log.WithField("service", oldService).Error("Old service isn't a rssBotService")
} else {
numOldFeeds = len(oldFeedService.Feeds)
}
if numOldFeeds == 0 {
return errors.New("An RSS feed must be specified.")
}
return nil
}
// Make sure we can parse the feed
for feedURL, feedInfo := range s.Feeds {
fp := gofeed.NewParser()
if _, err := fp.ParseURL(feedURL); err != nil {
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error())
}
if len(feedInfo.Rooms) == 0 {
return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL)
}
}
return nil
}
func (p *feedPoller) IntervalSecs() int64 { return 10 }
func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
func (s *rssBotService) PostRegister(oldService types.Service) {
if len(s.Feeds) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service: No feeds remaining.")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
func (s *rssBotService) OnPoll(cli *matrix.Client) time.Time {
logger := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(), "service_id": s.ServiceID(),
"service_type": s.ServiceType(), "service_type": s.ServiceType(),
}) })
frService, ok := s.(*rssBotService)
if !ok {
logger.Error("RSSBot: OnPoll called without a Feed Service instance")
return
}
now := time.Now().Unix() // Second resolution now := time.Now().Unix() // Second resolution
// Work out which feeds should be polled // Work out which feeds should be polled
var pollFeeds []string var pollFeeds []string
for u, feedInfo := range frService.Feeds {
for u, feedInfo := range s.Feeds {
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs { if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs {
// re-query this feed // re-query this feed
pollFeeds = append(pollFeeds, u) pollFeeds = append(pollFeeds, u)
@ -41,12 +91,12 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
} }
if len(pollFeeds) == 0 { if len(pollFeeds) == 0 {
return
return s.nextTimestamp()
} }
// Query each feed and send new items to subscribed rooms // Query each feed and send new items to subscribed rooms
for _, u := range pollFeeds { for _, u := range pollFeeds {
feed, items, err := p.queryFeed(frService, u)
feed, items, err := s.queryFeed(u)
if err != nil { if err != nil {
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed") logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
continue continue
@ -54,7 +104,7 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
// Loop backwards since [0] is the most recent and we want to send in chronological order // Loop backwards since [0] is the most recent and we want to send in chronological order
for i := len(items) - 1; i >= 0; i-- { for i := len(items) - 1; i >= 0; i-- {
item := items[i] item := items[i]
if err := p.sendToRooms(frService, cli, u, feed, item); err != nil {
if err := s.sendToRooms(cli, u, feed, item); err != nil {
logger.WithFields(log.Fields{ logger.WithFields(log.Fields{
"feed_url": u, "feed_url": u,
log.ErrorKey: err, log.ErrorKey: err,
@ -65,13 +115,26 @@ func (p *feedPoller) OnPoll(s types.Service, cli *matrix.Client) {
} }
// Persist the service to save the next poll times // Persist the service to save the next poll times
if _, err := database.GetServiceDB().StoreService(frService); err != nil {
if _, err := database.GetServiceDB().StoreService(s); err != nil {
logger.WithError(err).Error("Failed to persist next poll times for service") logger.WithError(err).Error("Failed to persist next poll times for service")
} }
return s.nextTimestamp()
}
func (s *rssBotService) nextTimestamp() time.Time {
// return the earliest next poll ts
var earliestNextTs int64
for _, feedInfo := range s.Feeds {
if earliestNextTs == 0 || feedInfo.NextPollTimestampSecs < earliestNextTs {
earliestNextTs = feedInfo.NextPollTimestampSecs
}
}
return time.Unix(earliestNextTs, 0)
} }
// Query the given feed, update relevant timestamps and return NEW items // Query the given feed, update relevant timestamps and return NEW items
func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
func (s *rssBotService) queryFeed(feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
log.WithField("feed_url", feedURL).Info("Querying feed") log.WithField("feed_url", feedURL).Info("Querying feed")
var items []gofeed.Item var items []gofeed.Item
fp := gofeed.NewParser() fp := gofeed.NewParser()
@ -115,11 +178,11 @@ func (p *feedPoller) queryFeed(s *rssBotService, feedURL string) (*gofeed.Feed,
// TODO: Handle the 'sy' Syndication extension to control update interval. // TODO: Handle the 'sy' Syndication extension to control update interval.
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/ // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
p.updateFeedInfo(s, feedURL, nextPollTsSec, feedLastUpdatedTs)
s.updateFeedInfo(feedURL, nextPollTsSec, feedLastUpdatedTs)
return feed, items, nil return feed, items, nil
} }
func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs, feedUpdatedTs int64) {
func (s *rssBotService) updateFeedInfo(feedURL string, nextPollTs, feedUpdatedTs int64) {
for u := range s.Feeds { for u := range s.Feeds {
if u != feedURL { if u != feedURL {
continue continue
@ -131,7 +194,7 @@ func (p *feedPoller) updateFeedInfo(s *rssBotService, feedURL string, nextPollTs
} }
} }
func (p *feedPoller) sendToRooms(s *rssBotService, cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
func (s *rssBotService) sendToRooms(cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
logger := log.WithField("feed_url", feedURL).WithField("title", item.Title) logger := log.WithField("feed_url", feedURL).WithField("title", item.Title)
logger.Info("New feed item") logger.Info("New feed item")
for _, roomID := range s.Feeds[feedURL].Rooms { for _, roomID := range s.Feeds[feedURL].Rooms {
@ -150,66 +213,6 @@ func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage {
)) ))
} }
type rssBotService struct {
types.DefaultService
id string
serviceUserID string
Feeds map[string]struct { // feed_url => { }
PollIntervalMins int `json:"poll_interval_mins"`
Rooms []string `json:"rooms"`
NextPollTimestampSecs int64 // Internal: When we should poll again
FeedUpdatedTimestampSecs int64 // Internal: The last time the feed was updated
} `json:"feeds"`
}
func (s *rssBotService) ServiceUserID() string { return s.serviceUserID }
func (s *rssBotService) ServiceID() string { return s.id }
func (s *rssBotService) ServiceType() string { return "rssbot" }
func (s *rssBotService) Poller() types.Poller { return &feedPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssBotService) Register(oldService types.Service, client *matrix.Client) error {
if len(s.Feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
var numOldFeeds int
oldFeedService, ok := oldService.(*rssBotService)
if !ok {
log.WithField("service", oldService).Error("Old service isn't a rssBotService")
} else {
numOldFeeds = len(oldFeedService.Feeds)
}
if numOldFeeds == 0 {
return errors.New("An RSS feed must be specified.")
}
return nil
}
// Make sure we can parse the feed
for feedURL, feedInfo := range s.Feeds {
fp := gofeed.NewParser()
if _, err := fp.ParseURL(feedURL); err != nil {
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error())
}
if len(feedInfo.Rooms) == 0 {
return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL)
}
}
return nil
}
func (s *rssBotService) PostRegister(oldService types.Service) {
if len(s.Feeds) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service: No feeds remaining.")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
func init() { func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &rssBotService{ r := &rssBotService{

32
src/github.com/matrix-org/go-neb/types/types.go

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"time"
) )
// A ClientConfig is the configuration for a matrix client for a bot to use. // A ClientConfig is the configuration for a matrix client for a bot to use.
@ -40,10 +41,11 @@ type BotOptions struct {
Options map[string]interface{} Options map[string]interface{}
} }
// Poller represents a thing that can be polled at a given rate.
// Poller represents a thing which can poll. Services should implement this method signature to support polling.
type Poller interface { type Poller interface {
IntervalSecs() int64
OnPoll(service Service, client *matrix.Client)
// OnPoll is called when the poller should poll. Return the timestamp when you want to be polled again.
// Return 0 to never be polled again.
OnPoll(client *matrix.Client) time.Time
} }
// A Service is the configuration for a bot service. // A Service is the configuration for a bot service.
@ -66,11 +68,6 @@ type Service interface {
// concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean
// up resources which are no longer needed (e.g. removing old webhooks). // up resources which are no longer needed (e.g. removing old webhooks).
PostRegister(oldService Service) PostRegister(oldService Service)
// Return a Poller object if you wish to be invoked every N seconds. This struct MUST NOT conditionally change: either
// ALWAYS return a new Poller interface or NEVER return a Poller. The Poller will exist outside of the lifetime of the
// Service upon which it is being called on, so DO NOT wrap this inside a closure or else you will introduce a memory
// leak. An instantiated service will be passed into the `OnPoll(Service)` for you to extract state.
Poller() Poller
} }
// DefaultService NO-OPs the implementation of optional Service interface methods. Feel free to override them. // DefaultService NO-OPs the implementation of optional Service interface methods. Feel free to override them.
@ -89,9 +86,6 @@ func (s *DefaultService) Register(oldService Service, client *matrix.Client) err
// PostRegister does nothing. // PostRegister does nothing.
func (s *DefaultService) PostRegister(oldService Service) {} func (s *DefaultService) PostRegister(oldService Service) {}
// Poller returns no poller.
func (s *DefaultService) Poller() Poller { return nil }
// OnReceiveWebhook does nothing but 200 OK the request. // OnReceiveWebhook does nothing but 200 OK the request.
func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
w.WriteHeader(200) // Do nothing w.WriteHeader(200) // Do nothing
@ -116,18 +110,24 @@ func BaseURL(u string) error {
} }
var servicesByType = map[string]func(string, string, string) Service{} var servicesByType = map[string]func(string, string, string) Service{}
var pollersByType = map[string]Poller{}
var serviceTypesWhichPoll = map[string]bool{}
// RegisterService registers a factory for creating Service instances. // RegisterService registers a factory for creating Service instances.
func RegisterService(factory func(string, string, string) Service) { func RegisterService(factory func(string, string, string) Service) {
s := factory("", "", "") s := factory("", "", "")
servicesByType[s.ServiceType()] = factory servicesByType[s.ServiceType()] = factory
pollersByType[s.ServiceType()] = s.Poller()
if _, ok := s.(Poller); ok {
serviceTypesWhichPoll[s.ServiceType()] = true
}
} }
// PollersByType returns a map of service type to poller, which may be nil
func PollersByType() map[string]Poller {
return pollersByType
// PollingServiceTypes returns a list of service types which meet the Poller interface
func PollingServiceTypes() (types []string) {
for t := range serviceTypesWhichPoll {
types = append(types, t)
}
return
} }
// CreateService creates a Service of the given type and serviceID. // CreateService creates a Service of the given type and serviceID.

Loading…
Cancel
Save