|
@ -11,6 +11,7 @@ import ( |
|
|
"github.com/matrix-org/go-neb/types" |
|
|
"github.com/matrix-org/go-neb/types" |
|
|
"net/http" |
|
|
"net/http" |
|
|
"strings" |
|
|
"strings" |
|
|
|
|
|
"sync" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type heartbeatHandler struct{} |
|
|
type heartbeatHandler struct{} |
|
@ -202,8 +203,29 @@ func (s *configureClientHandler) OnIncomingRequest(req *http.Request) (interface |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type configureServiceHandler struct { |
|
|
type configureServiceHandler struct { |
|
|
db *database.ServiceDB |
|
|
|
|
|
clients *clients.Clients |
|
|
|
|
|
|
|
|
db *database.ServiceDB |
|
|
|
|
|
clients *clients.Clients |
|
|
|
|
|
mapMutex sync.Mutex |
|
|
|
|
|
mutexByServiceID map[string]*sync.Mutex |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func newConfigureServiceHandler(db *database.ServiceDB, clients *clients.Clients) *configureServiceHandler { |
|
|
|
|
|
return &configureServiceHandler{ |
|
|
|
|
|
db: db, |
|
|
|
|
|
clients: clients, |
|
|
|
|
|
mutexByServiceID: make(map[string]*sync.Mutex), |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *configureServiceHandler) getMutexForServiceID(serviceID string) *sync.Mutex { |
|
|
|
|
|
s.mapMutex.Lock() |
|
|
|
|
|
defer s.mapMutex.Unlock() |
|
|
|
|
|
m := s.mutexByServiceID[serviceID] |
|
|
|
|
|
if m == nil { |
|
|
|
|
|
m = &sync.Mutex{} |
|
|
|
|
|
s.mutexByServiceID[serviceID] = m |
|
|
|
|
|
} |
|
|
|
|
|
return m |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interface{}, *errors.HTTPError) { |
|
|
func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interface{}, *errors.HTTPError) { |
|
@ -216,7 +238,10 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac |
|
|
return nil, httpErr |
|
|
return nil, httpErr |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// TODO mutex lock keyed off service ID
|
|
|
|
|
|
|
|
|
// Have mutexes around each service to queue up multiple requests for the same service ID
|
|
|
|
|
|
mut := s.getMutexForServiceID(service.ServiceID()) |
|
|
|
|
|
mut.Lock() |
|
|
|
|
|
defer mut.Unlock() |
|
|
|
|
|
|
|
|
old, err := s.db.LoadService(service.ServiceID()) |
|
|
old, err := s.db.LoadService(service.ServiceID()) |
|
|
if err != nil && err != sql.ErrNoRows { |
|
|
if err != nil && err != sql.ErrNoRows { |
|
@ -237,8 +262,6 @@ func (s *configureServiceHandler) OnIncomingRequest(req *http.Request) (interfac |
|
|
return nil, &errors.HTTPError{err, "Error storing service", 500} |
|
|
return nil, &errors.HTTPError{err, "Error storing service", 500} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// TODO mutex unlock keyed off service ID
|
|
|
|
|
|
|
|
|
|
|
|
return &struct { |
|
|
return &struct { |
|
|
ID string |
|
|
ID string |
|
|
Type string |
|
|
Type string |
|
|