diff --git a/src/github.com/matrix-org/go-neb/goneb.go b/src/github.com/matrix-org/go-neb/goneb.go index e0811ad..3d339b9 100644 --- a/src/github.com/matrix-org/go-neb/goneb.go +++ b/src/github.com/matrix-org/go-neb/goneb.go @@ -5,6 +5,7 @@ import ( "github.com/matrix-org/dugong" "github.com/matrix-org/go-neb/clients" "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/go-neb/polling" _ "github.com/matrix-org/go-neb/realms/github" _ "github.com/matrix-org/go-neb/realms/jira" "github.com/matrix-org/go-neb/server" @@ -12,6 +13,7 @@ import ( _ "github.com/matrix-org/go-neb/services/giphy" _ "github.com/matrix-org/go-neb/services/github" _ "github.com/matrix-org/go-neb/services/jira" + _ "github.com/matrix-org/go-neb/services/rss" "github.com/matrix-org/go-neb/types" _ "github.com/mattn/go-sqlite3" "net/http" @@ -69,5 +71,9 @@ func main() { rh := &realmRedirectHandler{db: db} http.HandleFunc("/realms/redirects/", rh.handle) + if err := polling.Start(); err != nil { + log.Panic(err) + } + http.ListenAndServe(bindAddress, nil) } diff --git a/src/github.com/matrix-org/go-neb/polling/polling.go b/src/github.com/matrix-org/go-neb/polling/polling.go new file mode 100644 index 0000000..6d69a22 --- /dev/null +++ b/src/github.com/matrix-org/go-neb/polling/polling.go @@ -0,0 +1,44 @@ +package polling + +import ( + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/go-neb/types" + "time" +) + +var shouldPoll = make(map[string]bool) // Service ID => yay/nay + +// Start polling already existing services +func Start() error { + log.Print("Start polling") + // Work out which service types require polling + for serviceType, poller := range types.PollersByType() { + if poller == nil { + continue + } + // Query for all services with said service type + srvs, err := database.GetServiceDB().LoadServicesByType(serviceType) + if err != nil { + return err + } + for _, s := range srvs { + shouldPoll[s.ServiceID()] = true + go StartPolling(s, poller) + } + } + return nil +} + +// StartPolling begins the polling loop for this service. Does not return, so call this +// as a goroutine! +func StartPolling(service types.Service, poller types.Poller) { + for { + if !shouldPoll[service.ServiceID()] { + log.WithField("service_id", service.ServiceID()).Info("Terminating poll.") + break + } + poller.OnPoll(service) + time.Sleep(time.Duration(poller.IntervalSecs()) * time.Second) + } +} diff --git a/src/github.com/matrix-org/go-neb/services/rss/rss.go b/src/github.com/matrix-org/go-neb/services/rss/rss.go index 39c05ce..47c5fa4 100644 --- a/src/github.com/matrix-org/go-neb/services/rss/rss.go +++ b/src/github.com/matrix-org/go-neb/services/rss/rss.go @@ -1,28 +1,79 @@ package services import ( + "errors" + log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/matrix" + "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" + "time" ) +type rssPoller struct{} + +func (p *rssPoller) IntervalSecs() int64 { return 10 } +func (p *rssPoller) OnPoll(s types.Service) { + rsss, ok := s.(*rssService) + if !ok { + log.WithField("service_id", s.ServiceID()).Error("RSS: OnPoll called without an RSS Service") + return + } + now := time.Now().Unix() // Second resolution + // URL => [ RoomID ] + urlsToRooms := make(map[string][]string) + + for roomID, roomInfo := range rsss.Rooms { + for u, feedInfo := range roomInfo.Feeds { + if feedInfo.LastPollTimestampSecs == 0 || (feedInfo.LastPollTimestampSecs+(int64(feedInfo.PollIntervalMins)*60)) > now { + // re-query this feed + urlsToRooms[u] = append(urlsToRooms[u], roomID) + } + } + } + + log.Print(rsss.ServiceID()+" Polly poll poll ", rsss.Rooms) +} + type rssService struct { types.DefaultService id string serviceUserID string - ClientUserID string `json:"client_user_id"` Rooms map[string]struct { // room_id => {} Feeds map[string]struct { // URL => { } - PollIntervalMs int `json:"poll_interval_ms"` - } `json:"feeds"` - } `json:"rooms"` + PollIntervalMins int `json:"poll_interval_mins"` + LastPollTimestampSecs int64 + } + } } func (s *rssService) ServiceUserID() string { return s.serviceUserID } func (s *rssService) ServiceID() string { return s.id } func (s *rssService) ServiceType() string { return "rss" } +func (s *rssService) Poller() types.Poller { return &rssPoller{} } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. -func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { return nil } +func (s *rssService) Register(oldService types.Service, client *matrix.Client) error { + urlSet := make(map[string]bool) + for _, roomInfo := range s.Rooms { + for u, feedInfo := range roomInfo.Feeds { + if feedInfo.PollIntervalMins == 0 { + feedInfo.PollIntervalMins = 1 + log.Print("Set poll interval to 1 ", u) + } + urlSet[u] = true + } + } + if len(urlSet) == 0 { + log.Print(s.Rooms) + return errors.New("An RSS feed must be specified.") + } + return nil +} + +// PostRegister will start polling +func (s *rssService) PostRegister(oldService types.Service) { + go polling.StartPolling(s, s.Poller()) +} func init() { types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { diff --git a/src/github.com/matrix-org/go-neb/types/types.go b/src/github.com/matrix-org/go-neb/types/types.go index 573cb79..a2cfea3 100644 --- a/src/github.com/matrix-org/go-neb/types/types.go +++ b/src/github.com/matrix-org/go-neb/types/types.go @@ -42,14 +42,17 @@ type BotOptions struct { // Poller represents a thing that can be polled at a given rate. type Poller interface { - IntervalSecs() int - OnPoll() + IntervalSecs() int64 + OnPoll(service Service) } // A Service is the configuration for a bot service. type Service interface { + // Return the user ID of this service. ServiceUserID() string + // Return an opaque ID used to identify this service. ServiceID() string + // Return the type of service. This string MUST NOT change. ServiceType() string Plugin(cli *matrix.Client, roomID string) plugin.Plugin OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) @@ -63,7 +66,10 @@ type Service interface { // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean // up resources which are no longer needed (e.g. removing old webhooks). PostRegister(oldService Service) - // Return a Poller object if you wish to be invoked every N seconds. + // Return a Poller object if you wish to be invoked every N seconds. This struct MUST NOT conditionally change: either + // ALWAYS return a new Poller interface or NEVER return a Poller. The Poller will exist outside of the lifetime of the + // Service upon which it is being called on, so DO NOT wrap this inside a closure or else you will introduce a memory + // leak. An instantiated service will be passed into the `OnPoll(Service)` for you to extract state. Poller() Poller } @@ -110,10 +116,18 @@ func BaseURL(u string) error { } var servicesByType = map[string]func(string, string, string) Service{} +var pollersByType = map[string]Poller{} // RegisterService registers a factory for creating Service instances. func RegisterService(factory func(string, string, string) Service) { - servicesByType[factory("", "", "").ServiceType()] = factory + s := factory("", "", "") + servicesByType[s.ServiceType()] = factory + pollersByType[s.ServiceType()] = s.Poller() +} + +// PollersByType returns a map of service type to poller, which may be nil +func PollersByType() map[string]Poller { + return pollersByType } // CreateService creates a Service of the given type and serviceID.