Browse Source

Merge pull request #44 from matrix-org/kegan/github-mutex-on-register

Mutex-guard services with the same ID being modified concurrently
pull/47/head
Kegsay 8 years ago
committed by GitHub
parent
commit
324231cf00
  1. 32
      src/github.com/matrix-org/go-neb/api.go
  2. 2
      src/github.com/matrix-org/go-neb/goneb.go
  3. 17
      src/github.com/matrix-org/go-neb/services/github/github.go

32
src/github.com/matrix-org/go-neb/api.go

@ -11,6 +11,7 @@ import (
"github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/types"
"net/http" "net/http"
"strings" "strings"
"sync"
) )
type heartbeatHandler struct{} type heartbeatHandler struct{}
@ -204,6 +205,30 @@ func (s *configureClientHandler) OnIncomingRequest(req *http.Request) (interface
type configureServiceHandler struct { type configureServiceHandler struct {
db *database.ServiceDB db *database.ServiceDB
clients *clients.Clients clients *clients.Clients
mapMutex sync.Mutex
mutexByServiceID map[string]*sync.Mutex
}
func newConfigureServiceHandler(db *database.ServiceDB, clients *clients.Clients) *configureServiceHandler {
return &configureServiceHandler{
db: db,
clients: clients,
mutexByServiceID: make(map[string]*sync.Mutex),
}
}
func (s *configureServiceHandler) getMutexForServiceID(serviceID string) *sync.Mutex {
s.mapMutex.Lock()
defer s.mapMutex.Unlock()
m := s.mutexByServiceID[serviceID]
if m == nil {
// XXX TODO: There's a memory leak here. The amount of mutexes created is unbounded, as there will be 1 per service which are never deleted.
// A better solution would be to have a striped hash map with a bounded pool of mutexes. We can't live with a single global mutex because the Register()
// function this is protecting does many many HTTP requests which can take a long time on bad networks and will head of line block other services.
m = &sync.Mutex{}
s.mutexByServiceID[serviceID] = m
}
return m
} }
func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interface{}, *errors.HTTPError) { func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interface{}, *errors.HTTPError) {
@ -216,7 +241,10 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
return nil, httpErr return nil, httpErr
} }
// TODO mutex lock keyed off service ID
// Have mutexes around each service to queue up multiple requests for the same service ID
mut := s.getMutexForServiceID(service.ServiceID())
mut.Lock()
defer mut.Unlock()
old, err := s.db.LoadService(service.ServiceID()) old, err := s.db.LoadService(service.ServiceID())
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
@ -237,8 +265,6 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac
return nil, &errors.HTTPError{err, "Error storing service", 500} return nil, &errors.HTTPError{err, "Error storing service", 500}
} }
// TODO mutex unlock keyed off service ID
return &struct { return &struct {
ID string ID string
Type string Type string

2
src/github.com/matrix-org/go-neb/goneb.go

@ -59,7 +59,7 @@ func main() {
http.Handle("/admin/getService", server.MakeJSONAPI(&getServiceHandler{db: db})) http.Handle("/admin/getService", server.MakeJSONAPI(&getServiceHandler{db: db}))
http.Handle("/admin/getSession", server.MakeJSONAPI(&getSessionHandler{db: db})) http.Handle("/admin/getSession", server.MakeJSONAPI(&getSessionHandler{db: db}))
http.Handle("/admin/configureClient", server.MakeJSONAPI(&configureClientHandler{db: db, clients: clients})) http.Handle("/admin/configureClient", server.MakeJSONAPI(&configureClientHandler{db: db, clients: clients}))
http.Handle("/admin/configureService", server.MakeJSONAPI(&configureServiceHandler{db: db, clients: clients}))
http.Handle("/admin/configureService", server.MakeJSONAPI(newConfigureServiceHandler(db, clients)))
http.Handle("/admin/configureAuthRealm", server.MakeJSONAPI(&configureAuthRealmHandler{db: db})) http.Handle("/admin/configureAuthRealm", server.MakeJSONAPI(&configureAuthRealmHandler{db: db}))
http.Handle("/admin/requestAuthSession", server.MakeJSONAPI(&requestAuthSessionHandler{db: db})) http.Handle("/admin/requestAuthSession", server.MakeJSONAPI(&requestAuthSessionHandler{db: db}))
wh := &webhookHandler{db: db, clients: clients} wh := &webhookHandler{db: db, clients: clients}

17
src/github.com/matrix-org/go-neb/services/github/github.go

@ -302,11 +302,26 @@ func (s *githubService) createHook(cli *github.Client, ownerRepo string) error {
cfg["secret"] = s.SecretToken cfg["secret"] = s.SecretToken
} }
events := []string{"push", "pull_request", "issues", "issue_comment", "pull_request_review_comment"} events := []string{"push", "pull_request", "issues", "issue_comment", "pull_request_review_comment"}
_, _, err := cli.Repositories.CreateHook(owner, repo, &github.Hook{
_, res, err := cli.Repositories.CreateHook(owner, repo, &github.Hook{
Name: &name, Name: &name,
Config: cfg, Config: cfg,
Events: events, Events: events,
}) })
if res.StatusCode == 422 {
errResponse, ok := err.(*github.ErrorResponse)
if !ok {
return err
}
for _, ghErr := range errResponse.Errors {
if strings.Contains(ghErr.Message, "already exists") {
log.WithField("repo", ownerRepo).Print("422 : Hook already exists")
return nil
}
}
return err
}
return err return err
} }

Loading…
Cancel
Save