Browse Source

Change how Services are notified for incoming !commands and expansions

Previously, we would notify `Services` based on matching the `room_id` of the
event with a list of `RoomIDs()` which the service returned.

Now we notify `Services` based on matching the `user_id` of the client listening
for events. This means that the service will receive more events because there
isn't a filter on a set of room IDs.

This is required in order to implement "auto-join on invite" semantics for
Services, as the room ID is not known at that point in time.
pull/36/head
Kegan Dougal 8 years ago
parent
commit
97a292c65f
  1. 21
      README.md
  2. 9
      src/github.com/matrix-org/go-neb/api.go
  3. 28
      src/github.com/matrix-org/go-neb/clients/clients.go
  4. 111
      src/github.com/matrix-org/go-neb/database/db.go
  5. 89
      src/github.com/matrix-org/go-neb/database/schema.go
  6. 3
      src/github.com/matrix-org/go-neb/goneb.go
  7. 12
      src/github.com/matrix-org/go-neb/services/echo/echo.go
  8. 19
      src/github.com/matrix-org/go-neb/services/github/github.go
  9. 19
      src/github.com/matrix-org/go-neb/services/jira/jira.go
  10. 11
      src/github.com/matrix-org/go-neb/types/types.go

21
README.md

@ -51,19 +51,16 @@ configured. To start an echo service:
curl -X POST localhost:4050/admin/configureService --data-binary '{
"Type": "echo",
"Id": "myserviceid",
"UserID": "@goneb:localhost:8448",
"Config": {
"UserID": "@goneb:localhost:8448",
"Rooms": ["!QkdpvTwGlrptdeViJx:localhost:8448"]
}
}'
{
"Type": "echo",
"Id": "myserviceid",
"UserID": "@goneb:localhost:8448",
"OldConfig": {},
"NewConfig": {
"UserID": "@goneb:localhost:8448",
"Rooms": ["!QkdpvTwGlrptdeViJx:localhost:8448"]
}
"NewConfig": {}
}
To retrieve an existing Service:
@ -74,10 +71,8 @@ To retrieve an existing Service:
{
"Type": "echo",
"Id": "myserviceid",
"Config": {
"UserID": "@goneb:localhost:8448",
"Rooms": ["!QkdpvTwGlrptdeViJx:localhost:8448"]
}
"UserID": "@goneb:localhost:8448",
"Config": {}
}
Go-neb has a heartbeat listener that returns 200 OK so that load balancers can
@ -179,10 +174,12 @@ Follow this link and grant access for NEB to act on your behalf.
curl -X POST localhost:4050/admin/configureService --data-binary '{
"Type": "github",
"Id": "mygithubserviceid",
"UserID": "@goneb:localhost",
"Config": {
"RealmID": "mygithubrealm",
"BotUserID": "@goneb:localhost",
"ClientUserID": "@example:localhost",
"HandleCommands": true,
"HandleExpansions": true,
"Rooms": {
"!EmwxeXCVubhskuWvaw:localhost": {
"Repos": {
@ -285,8 +282,8 @@ Follow this link and grant access for NEB to act on your behalf.
curl -X POST localhost:4050/admin/configureService --data-binary '{
"Type": "jira",
"Id": "jid",
"UserID": "@goneb:localhost",
"Config": {
"BotUserID": "@goneb:localhost",
"ClientUserID": "@example:localhost",
"Rooms": {
"!EmwxeXCVubhskuWvaw:localhost": {

9
src/github.com/matrix-org/go-neb/api.go

@ -214,17 +214,20 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
var body struct {
ID string
Type string
UserID string
Config json.RawMessage
}
if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
return nil, &errors.HTTPError{err, "Error parsing request JSON", 400}
}
if body.ID == "" || body.Type == "" || body.Config == nil {
return nil, &errors.HTTPError{nil, `Must supply a "ID", a "Type" and a "Config"`, 400}
if body.ID == "" || body.Type == "" || body.UserID == "" || body.Config == nil {
return nil, &errors.HTTPError{
nil, `Must supply an "ID", a "Type", a "UserID" and a "Config"`, 400,
}
}
service, err := types.CreateService(body.ID, body.Type, body.Config)
service, err := types.CreateService(body.ID, body.Type, body.UserID, body.Config)
if err != nil {
return nil, &errors.HTTPError{err, "Error parsing config JSON", 400}
}

28
src/github.com/matrix-org/go-neb/clients/clients.go

@ -43,31 +43,6 @@ func (c *Clients) Update(config types.ClientConfig) (types.ClientConfig, error)
return old.config, err
}
// Start the clients in the database and join them to the rooms.
func (c *Clients) Start() error {
userIDsToRooms, err := c.db.LoadServiceUserIds()
if err != nil {
return err
}
for userID, roomIDs := range userIDsToRooms {
client, err := c.Client(userID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"service_user_id": userID,
}).Warn("Error loading matrix client")
return err
}
for _, roomID := range roomIDs {
_, err := client.JoinRoom(roomID, "")
if err != nil {
return err
}
}
}
return nil
}
type clientEntry struct {
config types.ClientConfig
client *matrix.Client
@ -138,7 +113,6 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr
}
func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
homeserverURL, err := url.Parse(config.HomeserverURL)
if err != nil {
return nil, err
@ -150,7 +124,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
// a request against the server.
client.Worker.OnEventType("m.room.message", func(event *matrix.Event) {
services, err := c.db.LoadServicesInRoom(client.UserID, event.RoomID)
services, err := c.db.LoadServicesForUser(client.UserID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,

111
src/github.com/matrix-org/go-neb/database/db.go

@ -4,7 +4,6 @@ import (
"database/sql"
"github.com/matrix-org/go-neb/matrix"
"github.com/matrix-org/go-neb/types"
"sort"
"time"
)
@ -60,16 +59,6 @@ func (d *ServiceDB) StoreMatrixClientConfig(config types.ClientConfig) (oldConfi
return
}
// LoadServiceUserIds loads the user ids used by the bots in the database and
// the rooms those bots should be joined to.
func (d *ServiceDB) LoadServiceUserIds() (userIDsToRooms map[string][]string, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
userIDsToRooms, err = selectServiceUserIDsTxn(txn)
return err
})
return
}
// LoadMatrixClientConfig loads a Matrix client config from the database.
// Returns sql.ErrNoRows if the client isn't in the database.
func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientConfig, err error) {
@ -90,21 +79,14 @@ func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err er
return
}
// LoadServicesInRoom loads all the bot services configured for a room.
// Returns the empty list if there aren't any services configured.
func (d *ServiceDB) LoadServicesInRoom(serviceUserID, roomID string) (services []types.Service, err error) {
// LoadServicesForUser loads all the bot services configured for a given user.
// Returns an empty list if there aren't any services configured.
func (d *ServiceDB) LoadServicesForUser(serviceUserID string) (services []types.Service, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
serviceIDs, err := selectRoomServicesTxn(txn, serviceUserID, roomID)
services, err = selectServicesForUserTxn(txn, serviceUserID)
if err != nil {
return err
}
for _, serviceID := range serviceIDs {
service, err := selectServiceTxn(txn, serviceID)
if err != nil {
return err
}
services = append(services, service)
}
return nil
})
return
@ -116,60 +98,13 @@ func (d *ServiceDB) LoadServicesInRoom(serviceUserID, roomID string) (services [
func (d *ServiceDB) StoreService(service types.Service, client *matrix.Client) (oldService types.Service, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
oldService, err = selectServiceTxn(txn, service.ServiceID())
if err != nil && err != sql.ErrNoRows {
if err == sql.ErrNoRows {
return insertServiceTxn(txn, time.Now(), service)
} else if err != nil {
return err
}
now := time.Now()
var newRoomIDs []string
var oldRoomIDs []string
if oldService == nil {
if err := insertServiceTxn(txn, now, service); err != nil {
return err
}
newRoomIDs = service.RoomIDs()
} else {
if err := updateServiceTxn(txn, now, service); err != nil {
return err
}
if service.ServiceUserID() == oldService.ServiceUserID() {
oldRoomIDs, newRoomIDs = difference(
oldService.RoomIDs(), service.RoomIDs(),
)
} else {
oldRoomIDs = oldService.RoomIDs()
newRoomIDs = service.RoomIDs()
}
return updateServiceTxn(txn, time.Now(), service)
}
for _, roomID := range oldRoomIDs {
if err := deleteRoomServiceTxn(
txn, oldService.ServiceUserID(), roomID, service.ServiceID(),
); err != nil {
return err
}
// TODO: Leave the old rooms.
}
for _, roomID := range newRoomIDs {
if err := insertRoomServiceTxn(
txn, now, service.ServiceUserID(), roomID, service.ServiceID(),
); err != nil {
return err
}
// TODO: Making HTTP requests inside the database transaction is unfortunate.
// But it is the easiest way of making sure that the changes we
// made to the database get rolled back if the requests fail.
if _, err := client.JoinRoom(roomID, ""); err != nil {
// TODO: What happens to the rooms that we successfully joined?
// Should we leave them now?
return err
}
}
return nil
})
return
}
@ -269,33 +204,3 @@ func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
err = fn(txn)
return
}
// difference returns the elements that are only in the first list and
// the elements that are only in the second. As a side-effect this sorts
// the input lists in-place.
func difference(a, b []string) (onlyA, onlyB []string) {
sort.Strings(a)
sort.Strings(b)
for {
if len(b) == 0 {
onlyA = append(onlyA, a...)
return
}
if len(a) == 0 {
onlyB = append(onlyB, b...)
return
}
xA := a[0]
xB := b[0]
if xA < xB {
onlyA = append(onlyA, xA)
a = a[1:]
} else if xA > xB {
onlyB = append(onlyB, xB)
b = b[1:]
} else {
a = a[1:]
b = b[1:]
}
}
}

89
src/github.com/matrix-org/go-neb/database/schema.go

@ -12,20 +12,13 @@ const schemaSQL = `
CREATE TABLE IF NOT EXISTS services (
service_id TEXT NOT NULL,
service_type TEXT NOT NULL,
service_user_id TEXT NOT NULL,
service_json TEXT NOT NULL,
time_added_ms BIGINT NOT NULL,
time_updated_ms BIGINT NOT NULL,
UNIQUE(service_id)
);
CREATE TABLE IF NOT EXISTS rooms_to_services (
service_user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
service_id TEXT NOT NULL,
time_added_ms BIGINT NOT NULL,
UNIQUE(service_user_id, room_id, service_id)
);
CREATE TABLE IF NOT EXISTS matrix_clients (
user_id TEXT NOT NULL,
client_json TEXT NOT NULL,
@ -56,28 +49,6 @@ CREATE TABLE IF NOT EXISTS auth_sessions (
);
`
const selectServiceUserIDsSQL = `
SELECT service_user_id, room_id FROM rooms_to_services
GROUP BY service_user_id, room_id
`
// selectServiceUserIDsTxn returns a map from userIDs to lists of roomIDs.
func selectServiceUserIDsTxn(txn *sql.Tx) (map[string][]string, error) {
rows, err := txn.Query(selectServiceUserIDsSQL)
if err != nil {
return nil, err
}
result := make(map[string][]string)
for rows.Next() {
var uID, rID string
if err = rows.Scan(&uID, &rID); err != nil {
return nil, err
}
result[uID] = append(result[uID], rID)
}
return result, nil
}
const selectMatrixClientConfigSQL = `
SELECT client_json FROM matrix_clients WHERE user_id = $1
`
@ -124,23 +95,24 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client
}
const selectServiceSQL = `
SELECT service_type, service_json FROM services
SELECT service_type, service_user_id, service_json FROM services
WHERE service_id = $1
`
func selectServiceTxn(txn *sql.Tx, serviceID string) (types.Service, error) {
var serviceType string
var serviceUserID string
var serviceJSON []byte
row := txn.QueryRow(selectServiceSQL, serviceID)
if err := row.Scan(&serviceType, &serviceJSON); err != nil {
if err := row.Scan(&serviceType, &serviceUserID, &serviceJSON); err != nil {
return nil, err
}
return types.CreateService(serviceID, serviceType, serviceJSON)
return types.CreateService(serviceID, serviceType, serviceUserID, serviceJSON)
}
const updateServiceSQL = `
UPDATE services SET service_type=$1, service_json=$2, time_updated_ms=$3
WHERE service_id=$4
UPDATE services SET service_type=$1, service_user_id=$2, service_json=$3, time_updated_ms=$4
WHERE service_id=$5
`
func updateServiceTxn(txn *sql.Tx, now time.Time, service types.Service) error {
@ -150,7 +122,7 @@ func updateServiceTxn(txn *sql.Tx, now time.Time, service types.Service) error {
}
t := now.UnixNano() / 1000000
_, err = txn.Exec(
updateServiceSQL, service.ServiceType(), serviceJSON, t,
updateServiceSQL, service.ServiceType(), service.ServiceUserID(), serviceJSON, t,
service.ServiceID(),
)
return err
@ -158,8 +130,8 @@ func updateServiceTxn(txn *sql.Tx, now time.Time, service types.Service) error {
const insertServiceSQL = `
INSERT INTO services(
service_id, service_type, service_json, time_added_ms, time_updated_ms
) VALUES ($1, $2, $3, $4, $5)
service_id, service_type, service_user_id, service_json, time_added_ms, time_updated_ms
) VALUES ($1, $2, $3, $4, $5, $6)
`
func insertServiceTxn(txn *sql.Tx, now time.Time, service types.Service) error {
@ -170,47 +142,34 @@ func insertServiceTxn(txn *sql.Tx, now time.Time, service types.Service) error {
t := now.UnixNano() / 1000000
_, err = txn.Exec(
insertServiceSQL,
service.ServiceID(), service.ServiceType(), serviceJSON, t, t,
service.ServiceID(), service.ServiceType(), service.ServiceUserID(), serviceJSON, t, t,
)
return err
}
const insertRoomServiceSQL = `
INSERT INTO rooms_to_services(service_user_id, room_id, service_id, time_added_ms)
VALUES ($1, $2, $3, $4)
`
func insertRoomServiceTxn(txn *sql.Tx, now time.Time, serviceUserID, roomID, serviceID string) error {
t := now.UnixNano() / 1000000
_, err := txn.Exec(insertRoomServiceSQL, serviceUserID, roomID, serviceID, t)
return err
}
const deleteRoomServiceSQL = `
DELETE FROM rooms_to_services WHERE service_user_id=$1 AND room_id = $2 AND service_id=$3
const selectServicesForUserSQL = `
SELECT service_id, service_type, service_json FROM services WHERE service_user_id=$1 ORDER BY service_id
`
func deleteRoomServiceTxn(txn *sql.Tx, serviceUserID, roomID, serviceID string) error {
_, err := txn.Exec(deleteRoomServiceSQL, serviceUserID, roomID, serviceID)
return err
}
const selectRoomServicesSQL = `
SELECT service_id FROM rooms_to_services WHERE service_user_id=$1 AND room_id=$2
`
func selectRoomServicesTxn(txn *sql.Tx, serviceUserID, roomID string) (serviceIDs []string, err error) {
rows, err := txn.Query(selectRoomServicesSQL, serviceUserID, roomID)
func selectServicesForUserTxn(txn *sql.Tx, userID string) (srvs []types.Service, err error) {
rows, err := txn.Query(selectServicesForUserSQL, userID)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var s types.Service
var serviceID string
if err = rows.Scan(&serviceID); err != nil {
var serviceType string
var serviceJSON []byte
if err = rows.Scan(&serviceID, &serviceType, &serviceJSON); err != nil {
return
}
s, err = types.CreateService(serviceID, serviceType, userID, serviceJSON)
if err != nil {
return
}
serviceIDs = append(serviceIDs, serviceID)
srvs = append(srvs, s)
}
return
}

3
src/github.com/matrix-org/go-neb/goneb.go

@ -51,9 +51,6 @@ func main() {
database.SetServiceDB(db)
clients := clients.New(db)
if err := clients.Start(); err != nil {
log.Panic(err)
}
http.Handle("/test", server.MakeJSONAPI(&heartbeatHandler{}))
http.Handle("/admin/getService", server.MakeJSONAPI(&getServiceHandler{db: db}))

12
src/github.com/matrix-org/go-neb/services/echo/echo.go

@ -9,15 +9,13 @@ import (
)
type echoService struct {
id string
UserID string
Rooms []string
id string
serviceUserID string
}
func (e *echoService) ServiceUserID() string { return e.UserID }
func (e *echoService) ServiceUserID() string { return e.serviceUserID }
func (e *echoService) ServiceID() string { return e.id }
func (e *echoService) ServiceType() string { return "echo" }
func (e *echoService) RoomIDs() []string { return e.Rooms }
func (e *echoService) Register() error { return nil }
func (e *echoService) PostRegister(old types.Service) {}
func (e *echoService) Plugin(roomID string) plugin.Plugin {
@ -37,7 +35,7 @@ func (e *echoService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request,
}
func init() {
types.RegisterService(func(serviceID, webhookEndpointURL string) types.Service {
return &echoService{id: serviceID}
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
return &echoService{id: serviceID, serviceUserID: serviceUserID}
})
}

19
src/github.com/matrix-org/go-neb/services/github/github.go

@ -24,8 +24,8 @@ var ownerRepoIssueRegex = regexp.MustCompile("([A-z0-9-_]+)/([A-z0-9-_]+)#([0-9]
type githubService struct {
id string
serviceUserID string
webhookEndpointURL string
BotUserID string
ClientUserID string
RealmID string
SecretToken string
@ -38,17 +38,9 @@ type githubService struct {
}
}
func (s *githubService) ServiceUserID() string { return s.BotUserID }
func (s *githubService) ServiceUserID() string { return s.serviceUserID }
func (s *githubService) ServiceID() string { return s.id }
func (s *githubService) ServiceType() string { return "github" }
func (s *githubService) RoomIDs() []string {
var keys []string
for k := range s.Rooms {
keys = append(keys, k)
}
return keys
}
func (s *githubService) cmdGithubCreate(roomID, userID string, args []string) (interface{}, error) {
if !s.HandleCommands {
return nil, nil
@ -201,8 +193,8 @@ func (s *githubService) OnReceiveWebhook(w http.ResponseWriter, req *http.Reques
w.WriteHeader(200)
}
func (s *githubService) Register() error {
if s.RealmID == "" || s.ClientUserID == "" || s.BotUserID == "" {
return fmt.Errorf("RealmID, BotUserID and ClientUserID are required")
if s.RealmID == "" || s.ClientUserID == "" {
return fmt.Errorf("RealmID and ClientUserID are required")
}
// check realm exists
realm, err := database.GetServiceDB().LoadAuthRealm(s.RealmID)
@ -420,9 +412,10 @@ func removeHook(logger *log.Entry, cli *github.Client, owner, repo, webhookEndpo
}
func init() {
types.RegisterService(func(serviceID, webhookEndpointURL string) types.Service {
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
return &githubService{
id: serviceID,
serviceUserID: serviceUserID,
webhookEndpointURL: webhookEndpointURL,
}
})

19
src/github.com/matrix-org/go-neb/services/jira/jira.go

@ -25,8 +25,8 @@ var projectKeyRegex = regexp.MustCompile("^[A-z]+$")
type jiraService struct {
id string
serviceUserID string
webhookEndpointURL string
BotUserID string
ClientUserID string
Rooms map[string]struct { // room_id => {}
Realms map[string]struct { // realm_id => {} Determines the JIRA endpoint
@ -38,16 +38,9 @@ type jiraService struct {
}
}
func (s *jiraService) ServiceUserID() string { return s.BotUserID }
func (s *jiraService) ServiceUserID() string { return s.serviceUserID }
func (s *jiraService) ServiceID() string { return s.id }
func (s *jiraService) ServiceType() string { return "jira" }
func (s *jiraService) RoomIDs() []string {
var keys []string
for k := range s.Rooms {
keys = append(keys, k)
}
return keys
}
func (s *jiraService) Register() error {
// We only ever make 1 JIRA webhook which listens for all projects and then filter
// on receive. So we simply need to know if we need to make a webhook or not. We
@ -408,7 +401,11 @@ func htmlForEvent(whe *webhook.Event, jiraBaseURL string) string {
}
func init() {
types.RegisterService(func(serviceID, webhookEndpointURL string) types.Service {
return &jiraService{id: serviceID, webhookEndpointURL: webhookEndpointURL}
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
return &jiraService{
id: serviceID,
serviceUserID: serviceUserID,
webhookEndpointURL: webhookEndpointURL,
}
})
}

11
src/github.com/matrix-org/go-neb/types/types.go

@ -34,7 +34,6 @@ type Service interface {
ServiceUserID() string
ServiceID() string
ServiceType() string
RoomIDs() []string
Plugin(roomID string) plugin.Plugin
OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client)
Register() error
@ -59,16 +58,16 @@ func BaseURL(u string) error {
return nil
}
var servicesByType = map[string]func(string, string) Service{}
var servicesByType = map[string]func(string, string, string) Service{}
// RegisterService registers a factory for creating Service instances.
func RegisterService(factory func(string, string) Service) {
servicesByType[factory("", "").ServiceType()] = factory
func RegisterService(factory func(string, string, string) Service) {
servicesByType[factory("", "", "").ServiceType()] = factory
}
// CreateService creates a Service of the given type and serviceID.
// Returns an error if the Service couldn't be created.
func CreateService(serviceID, serviceType string, serviceJSON []byte) (Service, error) {
func CreateService(serviceID, serviceType, serviceUserID string, serviceJSON []byte) (Service, error) {
f := servicesByType[serviceType]
if f == nil {
return nil, errors.New("Unknown service type: " + serviceType)
@ -76,7 +75,7 @@ func CreateService(serviceID, serviceType string, serviceJSON []byte) (Service,
base64ServiceID := base64.RawURLEncoding.EncodeToString([]byte(serviceID))
webhookEndpointURL := baseURL + "services/hooks/" + base64ServiceID
service := f(serviceID, webhookEndpointURL)
service := f(serviceID, serviceUserID, webhookEndpointURL)
if err := json.Unmarshal(serviceJSON, service); err != nil {
return nil, err
}

Loading…
Cancel
Save