Browse Source

Merge pull request #76 from matrix-org/kegan/rss

Implement polling for Services
pull/68/merge
Kegsay 8 years ago
committed by GitHub
parent
commit
c6a15801eb
  1. 12
      src/github.com/matrix-org/go-neb/api.go
  2. 13
      src/github.com/matrix-org/go-neb/database/db.go
  3. 27
      src/github.com/matrix-org/go-neb/database/schema.go
  4. 5
      src/github.com/matrix-org/go-neb/goneb.go
  5. 103
      src/github.com/matrix-org/go-neb/polling/polling.go
  6. 7
      src/github.com/matrix-org/go-neb/services/echo/echo.go
  7. 5
      src/github.com/matrix-org/go-neb/services/giphy/giphy.go
  8. 7
      src/github.com/matrix-org/go-neb/services/github/github.go
  9. 7
      src/github.com/matrix-org/go-neb/services/github/github_webhook.go
  10. 2
      src/github.com/matrix-org/go-neb/services/jira/jira.go
  11. 111
      src/github.com/matrix-org/go-neb/services/rss/rss.go
  12. 48
      src/github.com/matrix-org/go-neb/types/types.go

12
src/github.com/matrix-org/go-neb/api.go

@ -8,6 +8,7 @@ import (
"github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/errors" "github.com/matrix-org/go-neb/errors"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"net/http" "net/http"
"strings" "strings"
@ -306,6 +307,17 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
return nil, &errors.HTTPError{err, "Error storing service", 500} return nil, &errors.HTTPError{err, "Error storing service", 500}
} }
// Start any polling NOW because they may decide to stop it in PostRegister, and we want to make
// sure we'll actually stop.
if service.Poller() != nil {
if err := polling.StartPolling(service); err != nil {
log.WithFields(log.Fields{
"service_id": service.ServiceID(),
log.ErrorKey: err,
}).Error("Failed to start poll loop.")
}
}
service.PostRegister(old) service.PostRegister(old)
return &struct { return &struct {

13
src/github.com/matrix-org/go-neb/database/db.go

@ -125,6 +125,19 @@ func (d *ServiceDB) LoadServicesForUser(serviceUserID string) (services []types.
return return
} }
// LoadServicesByType loads all the bot services configured for a given type.
// Returns an empty list if there aren't any services configured.
func (d *ServiceDB) LoadServicesByType(serviceType string) (services []types.Service, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
services, err = selectServicesByTypeTxn(txn, serviceType)
if err != nil {
return err
}
return nil
})
return
}
// StoreService stores a service into the database either by inserting a new // StoreService stores a service into the database either by inserting a new
// service or updating an existing service. Returns the old service if there // service or updating an existing service. Returns the old service if there
// was one. // was one.

27
src/github.com/matrix-org/go-neb/database/schema.go

@ -231,6 +231,33 @@ func selectServicesForUserTxn(txn *sql.Tx, userID string) (srvs []types.Service,
return return
} }
const selectServicesByTypeSQL = `
SELECT service_id, service_user_id, service_json FROM services WHERE service_type=$1 ORDER BY service_id
`
func selectServicesByTypeTxn(txn *sql.Tx, serviceType string) (srvs []types.Service, err error) {
rows, err := txn.Query(selectServicesByTypeSQL, serviceType)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var s types.Service
var serviceID string
var serviceUserID string
var serviceJSON []byte
if err = rows.Scan(&serviceID, &serviceUserID, &serviceJSON); err != nil {
return
}
s, err = types.CreateService(serviceID, serviceType, serviceUserID, serviceJSON)
if err != nil {
return
}
srvs = append(srvs, s)
}
return
}
const deleteServiceSQL = ` const deleteServiceSQL = `
DELETE FROM services WHERE service_id = $1 DELETE FROM services WHERE service_id = $1
` `

5
src/github.com/matrix-org/go-neb/goneb.go

@ -5,6 +5,7 @@ import (
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
"github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/clients"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/polling"
_ "github.com/matrix-org/go-neb/realms/github" _ "github.com/matrix-org/go-neb/realms/github"
_ "github.com/matrix-org/go-neb/realms/jira" _ "github.com/matrix-org/go-neb/realms/jira"
"github.com/matrix-org/go-neb/server" "github.com/matrix-org/go-neb/server"
@ -70,5 +71,9 @@ func main() {
rh := &realmRedirectHandler{db: db} rh := &realmRedirectHandler{db: db}
http.HandleFunc("/realms/redirects/", rh.handle) http.HandleFunc("/realms/redirects/", rh.handle)
if err := polling.Start(); err != nil {
log.Panic(err)
}
http.ListenAndServe(bindAddress, nil) http.ListenAndServe(bindAddress, nil)
} }

103
src/github.com/matrix-org/go-neb/polling/polling.go

@ -0,0 +1,103 @@
package polling
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/types"
"sync"
"time"
)
// Remember when we first started polling on this service ID. Polling routines will
// continually check this time. If the service gets updated, this will change, prompting
// older instances to die away. If this service gets removed, the time will be 0.
var (
pollMutex sync.Mutex
startPollTime = make(map[string]int64) // ServiceID => unix timestamp
)
// Start polling already existing services
func Start() error {
// Work out which service types require polling
for serviceType, poller := range types.PollersByType() {
if poller == nil {
continue
}
// Query for all services with said service type
srvs, err := database.GetServiceDB().LoadServicesByType(serviceType)
if err != nil {
return err
}
for _, s := range srvs {
if err := StartPolling(s); err != nil {
return err
}
}
}
return nil
}
// StartPolling begins a polling loop for this service.
// If one already exists for this service, it will be instructed to die. The new poll will not wait for this to happen,
// so there may be a brief period of overlap. It is safe to immediately call `StopPolling(service)` to immediately terminate
// this poll.
func StartPolling(service types.Service) error {
p := types.PollersByType()[service.ServiceType()]
if p == nil {
return fmt.Errorf("Service %s (type=%s) doesn't have a Poller", service.ServiceID(), service.ServiceType())
}
// Set the poll time BEFORE spinning off the goroutine in case the caller immediately stops us. If we don't do this here,
// we risk them setting the ts to 0 BEFORE we've set the start time, resulting in a poll when one was not intended.
ts := time.Now().UnixNano()
setPollStartTime(service, ts)
go pollLoop(service, p, ts)
return nil
}
// StopPolling stops all pollers for this service.
func StopPolling(service types.Service) {
log.WithFields(log.Fields{
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
}).Info("StopPolling")
setPollStartTime(service, 0)
}
// pollLoop begins the polling loop for this service. Does not return, so call this
// as a goroutine!
func pollLoop(service types.Service, poller types.Poller, ts int64) {
logger := log.WithFields(log.Fields{
"timestamp": ts,
"service_id": service.ServiceID(),
"service_type": service.ServiceType(),
"interval_secs": poller.IntervalSecs(),
})
logger.Info("Starting polling loop")
for {
poller.OnPoll(service)
if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.")
break
}
time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second)
if pollTimeChanged(service, ts) {
logger.Info("Terminating poll.")
break
}
}
}
// setPollStartTime clobbers the current poll time
func setPollStartTime(service types.Service, startTs int64) {
pollMutex.Lock()
defer pollMutex.Unlock()
startPollTime[service.ServiceID()] = startTs
}
// pollTimeChanged returns true if the poll start time for this service ID is different to the one supplied.
func pollTimeChanged(service types.Service, ts int64) bool {
pollMutex.Lock()
defer pollMutex.Unlock()
return startPollTime[service.ServiceID()] != ts
}

7
src/github.com/matrix-org/go-neb/services/echo/echo.go

@ -4,11 +4,11 @@ import (
"github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/plugin" "github.com/matrix-org/go-neb/plugin"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"net/http"
"strings" "strings"
) )
type echoService struct { type echoService struct {
types.DefaultService
id string id string
serviceUserID string serviceUserID string
} }
@ -16,8 +16,6 @@ type echoService struct {
func (e *echoService) ServiceUserID() string { return e.serviceUserID } func (e *echoService) ServiceUserID() string { return e.serviceUserID }
func (e *echoService) ServiceID() string { return e.id } func (e *echoService) ServiceID() string { return e.id }
func (e *echoService) ServiceType() string { return "echo" } func (e *echoService) ServiceType() string { return "echo" }
func (e *echoService) Register(oldService types.Service, client *matrix.Client) error { return nil }
func (e *echoService) PostRegister(oldService types.Service) {}
func (e *echoService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin { func (e *echoService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin {
return plugin.Plugin{ return plugin.Plugin{
Commands: []plugin.Command{ Commands: []plugin.Command{
@ -30,9 +28,6 @@ func (e *echoService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin {
}, },
} }
} }
func (e *echoService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
w.WriteHeader(200) // Do nothing
}
func init() { func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {

5
src/github.com/matrix-org/go-neb/services/giphy/giphy.go

@ -31,6 +31,7 @@ type giphySearch struct {
} }
type giphyService struct { type giphyService struct {
types.DefaultService
id string id string
serviceUserID string serviceUserID string
APIKey string `json:"api_key"`// beta key is dc6zaTOxFJmzC APIKey string `json:"api_key"`// beta key is dc6zaTOxFJmzC
@ -39,10 +40,6 @@ type giphyService struct {
func (s *giphyService) ServiceUserID() string { return s.serviceUserID } func (s *giphyService) ServiceUserID() string { return s.serviceUserID }
func (s *giphyService) ServiceID() string { return s.id } func (s *giphyService) ServiceID() string { return s.id }
func (s *giphyService) ServiceType() string { return "giphy" } func (s *giphyService) ServiceType() string { return "giphy" }
func (s *giphyService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
}
func (s *giphyService) Register(oldService types.Service, client *matrix.Client) error { return nil }
func (s *giphyService) PostRegister(oldService types.Service) {}
func (s *giphyService) Plugin(client *matrix.Client, roomID string) plugin.Plugin { func (s *giphyService) Plugin(client *matrix.Client, roomID string) plugin.Plugin {
return plugin.Plugin{ return plugin.Plugin{

7
src/github.com/matrix-org/go-neb/services/github/github.go

@ -11,7 +11,6 @@ import (
"github.com/matrix-org/go-neb/realms/github" "github.com/matrix-org/go-neb/realms/github"
"github.com/matrix-org/go-neb/services/github/client" "github.com/matrix-org/go-neb/services/github/client"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"net/http"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@ -23,6 +22,7 @@ var ownerRepoIssueRegex = regexp.MustCompile(`(([A-z0-9-_]+)/([A-z0-9-_]+))?#([0
var ownerRepoRegex = regexp.MustCompile(`^([A-z0-9-_]+)/([A-z0-9-_]+)$`) var ownerRepoRegex = regexp.MustCompile(`^([A-z0-9-_]+)/([A-z0-9-_]+)$`)
type githubService struct { type githubService struct {
types.DefaultService
id string id string
serviceUserID string serviceUserID string
RealmID string RealmID string
@ -180,9 +180,6 @@ func (s *githubService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin
}, },
} }
} }
func (s *githubService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
w.WriteHeader(400)
}
// Register will create webhooks for the repos specified in Rooms // Register will create webhooks for the repos specified in Rooms
// //
@ -212,8 +209,6 @@ func (s *githubService) Register(oldService types.Service, client *matrix.Client
return nil return nil
} }
func (s *githubService) PostRegister(oldService types.Service) {}
// defaultRepo returns the default repo for the given room, or an empty string. // defaultRepo returns the default repo for the given room, or an empty string.
func (s *githubService) defaultRepo(roomID string) string { func (s *githubService) defaultRepo(roomID string) string {
logger := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{

7
src/github.com/matrix-org/go-neb/services/github/github_webhook.go

@ -6,7 +6,6 @@ import (
"github.com/google/go-github/github" "github.com/google/go-github/github"
"github.com/matrix-org/go-neb/database" "github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/plugin"
"github.com/matrix-org/go-neb/services/github/client" "github.com/matrix-org/go-neb/services/github/client"
"github.com/matrix-org/go-neb/services/github/webhook" "github.com/matrix-org/go-neb/services/github/webhook"
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
@ -17,10 +16,11 @@ import (
) )
type githubWebhookService struct { type githubWebhookService struct {
types.DefaultService
id string id string
serviceUserID string serviceUserID string
webhookEndpointURL string webhookEndpointURL string
ClientUserID string // optional; required for webhooks
ClientUserID string
RealmID string RealmID string
SecretToken string SecretToken string
Rooms map[string]struct { // room_id => {} Rooms map[string]struct { // room_id => {}
@ -33,9 +33,6 @@ type githubWebhookService struct {
func (s *githubWebhookService) ServiceUserID() string { return s.serviceUserID } func (s *githubWebhookService) ServiceUserID() string { return s.serviceUserID }
func (s *githubWebhookService) ServiceID() string { return s.id } func (s *githubWebhookService) ServiceID() string { return s.id }
func (s *githubWebhookService) ServiceType() string { return "github-webhook" } func (s *githubWebhookService) ServiceType() string { return "github-webhook" }
func (s *githubWebhookService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin {
return plugin.Plugin{}
}
func (s *githubWebhookService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { func (s *githubWebhookService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
evType, repo, msg, err := webhook.OnReceiveRequest(req, s.SecretToken) evType, repo, msg, err := webhook.OnReceiveRequest(req, s.SecretToken)
if err != nil { if err != nil {

2
src/github.com/matrix-org/go-neb/services/jira/jira.go

@ -24,6 +24,7 @@ var issueKeyRegex = regexp.MustCompile("([A-z]+)-([0-9]+)")
var projectKeyRegex = regexp.MustCompile("^[A-z]+$") var projectKeyRegex = regexp.MustCompile("^[A-z]+$")
type jiraService struct { type jiraService struct {
types.DefaultService
id string id string
serviceUserID string serviceUserID string
webhookEndpointURL string webhookEndpointURL string
@ -41,7 +42,6 @@ type jiraService struct {
func (s *jiraService) ServiceUserID() string { return s.serviceUserID } func (s *jiraService) ServiceUserID() string { return s.serviceUserID }
func (s *jiraService) ServiceID() string { return s.id } func (s *jiraService) ServiceID() string { return s.id }
func (s *jiraService) ServiceType() string { return "jira" } func (s *jiraService) ServiceType() string { return "jira" }
func (s *jiraService) PostRegister(oldService types.Service) {}
func (s *jiraService) Register(oldService types.Service, client *matrix.Client) error { func (s *jiraService) Register(oldService types.Service, client *matrix.Client) error {
// We only ever make 1 JIRA webhook which listens for all projects and then filter // We only ever make 1 JIRA webhook which listens for all projects and then filter
// on receive. So we simply need to know if we need to make a webhook or not. We // on receive. So we simply need to know if we need to make a webhook or not. We

111
src/github.com/matrix-org/go-neb/services/rss/rss.go

@ -0,0 +1,111 @@
package services
import (
"errors"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types"
"time"
)
type rssPoller struct{}
func (p *rssPoller) IntervalSecs() int64 { return 10 }
func (p *rssPoller) OnPoll(s types.Service) {
rsss, ok := s.(*rssService)
if !ok {
log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service")
return
}
now := time.Now().Unix() // Second resolution
// URL => [ RoomID ]
urlsToRooms := make(map[string][]string)
for roomID, roomInfo := range rsss.Rooms {
for u, feedInfo := range roomInfo.Feeds {
if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now {
// re-query this feed
urlsToRooms[u] = append(urlsToRooms[u], roomID)
}
}
}
// TODO: Some polling
}
type rssService struct {
types.DefaultService
id string
serviceUserID string
Rooms map[string]struct { // room_id => {}
Feeds map[string]struct { // URL => { }
PollIntervalMins int `json:"poll_interval_mins"`
LastPollTimestampSecs int64
}
}
}
func (s *rssService) ServiceUserID() string { return s.serviceUserID }
func (s *rssService) ServiceID() string { return s.id }
func (s *rssService) ServiceType() string { return "rss" }
func (s *rssService) Poller() types.Poller { return &rssPoller{} }
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *rssService) Register(oldService types.Service, client *matrix.Client) error {
feeds := feedUrls(s)
if len(feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
oldFeeds := feedUrls(oldService)
if len(oldFeeds) == 0 {
return errors.New("An RSS feed must be specified.")
}
}
return nil
}
func (s *rssService) PostRegister(oldService types.Service) {
if len(feedUrls(s)) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service (0 feeds)")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
// feedUrls returns a list of feed urls for this service
func feedUrls(srv types.Service) []string {
var feeds []string
s, ok := srv.(*rssService)
if !ok {
return feeds
}
urlSet := make(map[string]bool)
for _, roomInfo := range s.Rooms {
for u := range roomInfo.Feeds {
urlSet[u] = true
}
}
for u := range urlSet {
feeds = append(feeds, u)
}
return feeds
}
func init() {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &rssService{
id: serviceID,
serviceUserID: serviceUserID,
}
return r
})
}

48
src/github.com/matrix-org/go-neb/types/types.go

@ -40,10 +40,19 @@ type BotOptions struct {
Options map[string]interface{} Options map[string]interface{}
} }
// Poller represents a thing that can be polled at a given rate.
type Poller interface {
IntervalSecs() int64
OnPoll(service Service)
}
// A Service is the configuration for a bot service. // A Service is the configuration for a bot service.
type Service interface { type Service interface {
// Return the user ID of this service.
ServiceUserID() string ServiceUserID() string
// Return an opaque ID used to identify this service.
ServiceID() string ServiceID() string
// Return the type of service. This string MUST NOT change.
ServiceType() string ServiceType() string
Plugin(cli *matrix.Client, roomID string) plugin.Plugin Plugin(cli *matrix.Client, roomID string) plugin.Plugin
OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client)
@ -57,6 +66,35 @@ type Service interface {
// concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean
// up resources which are no longer needed (e.g. removing old webhooks). // up resources which are no longer needed (e.g. removing old webhooks).
PostRegister(oldService Service) PostRegister(oldService Service)
// Return a Poller object if you wish to be invoked every N seconds. This struct MUST NOT conditionally change: either
// ALWAYS return a new Poller interface or NEVER return a Poller. The Poller will exist outside of the lifetime of the
// Service upon which it is being called on, so DO NOT wrap this inside a closure or else you will introduce a memory
// leak. An instantiated service will be passed into the `OnPoll(Service)` for you to extract state.
Poller() Poller
}
// DefaultService NO-OPs the implementation of optional Service interface methods. Feel free to override them.
type DefaultService struct {
Service
}
// Plugin returns no plugins.
func (s *DefaultService) Plugin(cli *matrix.Client, roomID string) plugin.Plugin {
return plugin.Plugin{}
}
// Register does nothing and returns no error.
func (s *DefaultService) Register(oldService Service, client *matrix.Client) error { return nil }
// PostRegister does nothing.
func (s *DefaultService) PostRegister(oldService Service) {}
// Poller returns no poller.
func (s *DefaultService) Poller() Poller { return nil }
// OnReceiveWebhook does nothing but 200 OK the request.
func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) {
w.WriteHeader(200) // Do nothing
} }
var baseURL = "" var baseURL = ""
@ -78,10 +116,18 @@ func BaseURL(u string) error {
} }
var servicesByType = map[string]func(string, string, string) Service{} var servicesByType = map[string]func(string, string, string) Service{}
var pollersByType = map[string]Poller{}
// RegisterService registers a factory for creating Service instances. // RegisterService registers a factory for creating Service instances.
func RegisterService(factory func(string, string, string) Service) { func RegisterService(factory func(string, string, string) Service) {
servicesByType[factory("", "", "").ServiceType()] = factory
s := factory("", "", "")
servicesByType[s.ServiceType()] = factory
pollersByType[s.ServiceType()] = s.Poller()
}
// PollersByType returns a map of service type to poller, which may be nil
func PollersByType() map[string]Poller {
return pollersByType
} }
// CreateService creates a Service of the given type and serviceID. // CreateService creates a Service of the given type and serviceID.

Loading…
Cancel
Save