Browse Source

Get polling working

Currently never stops polling so clobbering service IDs does the wrong thing,
and deleting services doesn't stop polling at all!
pull/76/head
Kegan Dougal 8 years ago
parent
commit
fe2ab0c8bd
  1. 6
      src/github.com/matrix-org/go-neb/goneb.go
  2. 44
      src/github.com/matrix-org/go-neb/polling/polling.go
  3. 61
      src/github.com/matrix-org/go-neb/services/rss/rss.go
  4. 22
      src/github.com/matrix-org/go-neb/types/types.go

6
src/github.com/matrix-org/go-neb/goneb.go

@ -5,6 +5,7 @@ import (
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
"github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/polling"
_ "github.com/matrix-org/go-neb/realms/github" _ "github.com/matrix-org/go-neb/realms/github"
_ "github.com/matrix-org/go-neb/realms/jira" _ "github.com/matrix-org/go-neb/realms/jira"
"github.com/matrix-org/go-neb/server" "github.com/matrix-org/go-neb/server"
@ -12,6 +13,7 @@ import (
_ "github.com/matrix-org/go-neb/services/giphy" _ "github.com/matrix-org/go-neb/services/giphy"
_ "github.com/matrix-org/go-neb/services/github" _ "github.com/matrix-org/go-neb/services/github"
_ "github.com/matrix-org/go-neb/services/jira" _ "github.com/matrix-org/go-neb/services/jira"
_ "github.com/matrix-org/go-neb/services/rss"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"net/http" "net/http"
@ -69,5 +71,9 @@ func main() {
rh := &realmRedirectHandler{db: db} rh := &realmRedirectHandler{db: db}
http.HandleFunc("/realms/redirects/", rh.handle) http.HandleFunc("/realms/redirects/", rh.handle)
if err := polling.Start(); err != nil {
log.Panic(err)
}
http.ListenAndServe(bindAddress, nil) http.ListenAndServe(bindAddress, nil)
} }

44
src/github.com/matrix-org/go-neb/polling/polling.go

@ -0,0 +1,44 @@
package polling
import (
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/types"
"time"
)
var shouldPoll = make(map[string]bool) // Service ID => yay/nay
// Start polling already existing services
func Start() error {
log.Print("Start polling")
// Work out which service types require polling
for serviceType, poller := range types.PollersByType() {
if poller == nil {
continue
}
// Query for all services with said service type
srvs, err := database.GetServiceDB().LoadServicesByType(serviceType)
if err != nil {
return err
}
for _, s := range srvs {
shouldPoll[s.ServiceID()] = true
go StartPolling(s, poller)
}
}
return nil
}
// StartPolling begins the polling loop for this service. Does not return, so call this
// as a goroutine!
func StartPolling(service types.Service, poller types.Poller) {
for {
if !shouldPoll[service.ServiceID()] {
log.WithField("service_id", service.ServiceID()).Info("Terminating poll.")
break
}
poller.OnPoll(service)
time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second)
}
}

61
src/github.com/matrix-org/go-neb/services/rss/rss.go

@ -1,28 +1,79 @@
package services package services
import ( import (
"errors"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"time"
) )
type rssPoller struct{}
func (p *rssPoller) IntervalSecs() int64 { return 10 }
func (p *rssPoller) OnPoll(s types.Service) {
rsss, ok := s.(*rssService)
if !ok {
log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service")
return
}
now := time.Now().Unix() // Second resolution
// URL => [ RoomID ]
urlsToRooms := make(map[string][]string)
for roomID, roomInfo := range rsss.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now {
// re-query this feed
urlsToRooms[u] = append(urlsToRooms[u], roomID)
}
}
}
log.Print(rsss.ServiceID()+" Polly poll poll ", rsss.Rooms)
}
type rssService struct { type rssService struct {
types.DefaultService types.DefaultService
id string id string
serviceUserID string serviceUserID string
ClientUserID string `json:"client_user_id"`
Rooms map[string]struct { // room_id => {} Rooms map[string]struct { // room_id => {}
Feeds map[string]struct { // URL => { } Feeds map[string]struct { // URL => { }
PollIntervalMs int `json:"poll_interval_ms"`
} `json:"feeds"`
} `json:"rooms"`
PollIntervalMins int `json:"poll_interval_mins"`
LastPollTimestampSecs int64
}
}
} }
func (s *rssService) ServiceUserID() string { return s.serviceUserID } func (s *rssService) ServiceUserID() string { return s.serviceUserID }
func (s *rssService) ServiceID() string { return s.id } func (s *rssService) ServiceID() string { return s.id }
func (s *rssService) ServiceType() string { return "rss" } func (s *rssService) ServiceType() string { return "rss" }
func (s *rssService) Poller() types.Poller { return &rssPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { return nil }
func (s *rssService) Register(oldService types.Service, client *matrix.Client) error {
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.PollIntervalMins == 0 {
feedInfo.PollIntervalMins = 1
log.Print("Set poll interval to 1 ", u)
}
urlSet[u] = true
}
}
if len(urlSet) == 0 {
log.Print(s.Rooms)
return errors.New("An RSS feed must be specified.")
}
return nil
}
// PostRegister will start polling
func (s *rssService) PostRegister(oldService types.Service) {
go polling.StartPolling(s, s.Poller())
}
func init() { func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {

22
src/github.com/matrix-org/go-neb/types/types.go

@ -42,14 +42,17 @@ type BotOptions struct {
// Poller represents a thing that can be polled at a given rate. // Poller represents a thing that can be polled at a given rate.
type Poller interface { type Poller interface {
IntervalSecs() int
OnPoll()
IntervalSecs() int64
OnPoll(service Service)
} }
// A Service is the configuration for a bot service. // A Service is the configuration for a bot service.
type Service interface { type Service interface {
// Return the user ID of this service.
ServiceUserID() string ServiceUserID() string
// Return an opaque ID used to identify this service.
ServiceID() string ServiceID() string
// Return the type of service. This string MUST NOT change.
ServiceType() string ServiceType() string
Plugin(cli *matrix.Client, roomID string) plugin.Plugin Plugin(cli *matrix.Client, roomID string) plugin.Plugin
OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client)
@ -63,7 +66,10 @@ type Service interface {
// concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean
// up resources which are no longer needed (e.g. removing old webhooks). // up resources which are no longer needed (e.g. removing old webhooks).
PostRegister(oldService Service) PostRegister(oldService Service)
// Return a Poller object if you wish to be invoked every N seconds.
// Return a Poller object if you wish to be invoked every N seconds. This struct MUST NOT conditionally change: either
// ALWAYS return a new Poller interface or NEVER return a Poller. The Poller will exist outside of the lifetime of the
// Service upon which it is being called on, so DO NOT wrap this inside a closure or else you will introduce a memory
// leak. An instantiated service will be passed into the `OnPoll(Service)` for you to extract state.
Poller() Poller Poller() Poller
} }
@ -110,10 +116,18 @@ func BaseURL(u string) error {
} }
var servicesByType = map[string]func(string, string, string) Service{} var servicesByType = map[string]func(string, string, string) Service{}
var pollersByType = map[string]Poller{}
// RegisterService registers a factory for creating Service instances. // RegisterService registers a factory for creating Service instances.
func RegisterService(factory func(string, string, string) Service) { func RegisterService(factory func(string, string, string) Service) {
servicesByType[factory("", "", "").ServiceType()] = factory
s := factory("", "", "")
servicesByType[s.ServiceType()] = factory
pollersByType[s.ServiceType()] = s.Poller()
}
// PollersByType returns a map of service type to poller, which may be nil
func PollersByType() map[string]Poller {
return pollersByType
} }
// CreateService creates a Service of the given type and serviceID. // CreateService creates a Service of the given type and serviceID.

Loading…
Cancel
Save