Browse Source

Add ability to update/stop polling services

All appears to be in working order now. Now time for the RSS feeds!
pull/76/head
Kegan Dougal 8 years ago
parent
commit
331bdc703f
  1. 12
      src/github.com/matrix-org/go-neb/api.go
  2. 77
      src/github.com/matrix-org/go-neb/polling/polling.go
  3. 53
      src/github.com/matrix-org/go-neb/services/rss/rss.go

12
src/github.com/matrix-org/go-neb/api.go

@ -8,6 +8,7 @@ import (
"github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/errors" "github.com/matrix-org/go-neb/errors"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"net/http" "net/http"
"strings" "strings"
@ -306,6 +307,17 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
return nil, &errors.HTTPError{err, "Error storing service", 500} return nil, &errors.HTTPError{err, "Error storing service", 500}
} }
// Start any polling NOW because they may decide to stop it in PostRegister, and we want to make
// sure we'll actually stop.
if service.Poller() != nil {
if err := polling.StartPolling(service); err != nil {
log.WithFields(log.Fields{
"service_id": service.ServiceID(),
log.ErrorKey: err,
}).Error("Failed to start poll loop.")
}
}
service.PostRegister(old) service.PostRegister(old)
return &struct { return &struct {

77
src/github.com/matrix-org/go-neb/polling/polling.go

@ -1,17 +1,24 @@
package polling package polling
import ( import (
"fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"sync"
"time" "time"
) )
var shouldPoll = make(map[string]bool) // Service ID => yay/nay
// Remember when we first started polling on this service ID. Polling routines will
// continually check this time. If the service gets updated, this will change, prompting
// older instances to die away. If this service gets removed, the time will be 0.
var (
pollMutex sync.Mutex
startPollTime = make(map[string]int64) // ServiceID => unix timestamp
)
// Start polling already existing services // Start polling already existing services
func Start() error { func Start() error {
log.Print("Start polling")
// Work out which service types require polling // Work out which service types require polling
for serviceType, poller := range types.PollersByType() { for serviceType, poller := range types.PollersByType() {
if poller == nil { if poller == nil {
@ -23,22 +30,74 @@ func Start() error {
return err return err
} }
for _, s := range srvs { for _, s := range srvs {
shouldPoll[s.ServiceID()] = true
go StartPolling(s, poller)
if err := StartPolling(s); err != nil {
return err
}
} }
} }
return nil return nil
} }
// StartPolling begins the polling loop for this service. Does not return, so call this
// StartPolling begins a polling loop for this service.
// If one already exists for this service, it will be instructed to die. The new poll will not wait for this to happen,
// so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate
// this poll.
func StartPolling(service types.Service) error {
p := types.PollersByType()[service.ServiceType()]
if p == nil {
return fmt.Errorf("Service %s (type=%s) doesn't have a Poller", service.ServiceID(), service.ServiceType())
}
// Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here,
// we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended.
ts := time.Now().UnixNano()
setPollStartTime(service, ts)
go pollLoop(service, p, ts)
return nil
}
// StopPolling stops all pollers for this service.
func StopPolling(service types.Service) {
log.WithFields(log.Fields{
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
}).Info("StopPolling")
setPollStartTime(service, 0)
}
// pollLoop begins the polling loop for this service. Does not return, so call this
// as a goroutine! // as a goroutine!
func StartPolling(service types.Service, poller types.Poller) {
func pollLoop(service types.Service, poller types.Poller, ts int64) {
logger := log.WithFields(log.Fields{
"timestamp": ts,
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
"interval_secs": poller.IntervalSecs(),
})
logger.Info("Starting polling loop")
for { for {
if !shouldPoll[service.ServiceID()] {
log.WithField("service_id", service.ServiceID()).Info("Terminating poll.")
poller.OnPoll(service)
if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.")
break break
} }
poller.OnPoll(service)
time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second) time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second)
if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.")
break
}
} }
} }
// setPollStartTime clobbers the current poll time
func setPollStartTime(service types.Service, startTs int64) {
pollMutex.Lock()
defer pollMutex.Unlock()
startPollTime[service.ServiceID()] = startTs
}
// pollTimeChanged returns true if the poll start time for this service ID is different to the one supplied.
func pollTimeChanged(service types.Service, ts int64) bool {
pollMutex.Lock()
defer pollMutex.Unlock()
return startPollTime[service.ServiceID()] != ts
}

53
src/github.com/matrix-org/go-neb/services/rss/rss.go

@ -3,6 +3,7 @@ package services
import ( import (
"errors" "errors"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
@ -53,26 +54,50 @@ func (s *rssService) Poller() types.Poller { return &rssPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { func (s *rssService) Register(oldService types.Service, client *matrix.Client) error {
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.PollIntervalMins == 0 {
feedInfo.PollIntervalMins = 1
log.Print("Set poll interval to 1 ", u)
}
urlSet[u] = true
}
}
if len(urlSet) == 0 {
log.Print(s.Rooms)
feeds := feedUrls(s)
if len(feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
oldFeeds := feedUrls(oldService)
if len(oldFeeds) == 0 {
return errors.New("An RSS feed must be specified.") return errors.New("An RSS feed must be specified.")
} }
}
return nil return nil
} }
// PostRegister will start polling
func (s *rssService) PostRegister(oldService types.Service) { func (s *rssService) PostRegister(oldService types.Service) {
go polling.StartPolling(s, s.Poller())
if len(feedUrls(s)) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service (0 feeds)")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
// feedUrls returns a list of feed urls for this service
func feedUrls(srv types.Service) []string {
var feeds []string
s, ok := srv.(*rssService)
if !ok {
return feeds
}
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u := range roomInfo.Feeds {
urlSet[u] = true
}
}
for u := range urlSet {
feeds = append(feeds, u)
}
return feeds
} }
func init() { func init() {

Loading…
Cancel
Save