Browse Source

Persist next_batch tokens

Indirect this through a `NextBatchStore` type to get around circular dependencies
which are formed if you try to inline it in the `Sync()` while loop.

Fork out event handlers to separate functions in `Clients` to make gocyclo happy.

Add `UPDATE` query for `next_batch`.
pull/60/head
Kegan Dougal 8 years ago
parent
commit
5f9a065d83
  1. 133
      src/github.com/matrix-org/go-neb/clients/clients.go
  2. 8
      src/github.com/matrix-org/go-neb/database/db.go
  3. 9
      src/github.com/matrix-org/go-neb/database/schema.go
  4. 43
      src/github.com/matrix-org/go-neb/matrix/matrix.go

133
src/github.com/matrix-org/go-neb/clients/clients.go

@ -141,6 +141,71 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr
return return
} }
func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) {
services, err := c.db.LoadServicesForUser(client.UserID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"room_id": event.RoomID,
"service_user_id": client.UserID,
}).Warn("Error loading services")
}
var plugins []plugin.Plugin
for _, service := range services {
plugins = append(plugins, service.Plugin(client, event.RoomID))
}
plugin.OnMessage(plugins, client, event)
}
func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) {
// see if these options are for us. The state key is the user ID with a leading _
// to get around restrictions in the HS about having user IDs as state keys.
targetUserID := strings.TrimPrefix(event.StateKey, "_")
if targetUserID != client.UserID {
return
}
// these options fully clobber what was there previously.
opts := types.BotOptions{
UserID: client.UserID,
RoomID: event.RoomID,
SetByUserID: event.Sender,
Options: event.Content,
}
if _, err := c.db.StoreBotOptions(opts); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"room_id": event.RoomID,
"bot_user_id": client.UserID,
"set_by_user_id": event.Sender,
}).Error("Failed to persist bot options")
}
}
func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) {
if event.StateKey != client.UserID {
return // not our member event
}
m := event.Content["membership"]
membership, ok := m.(string)
if !ok {
return
}
if membership == "invite" {
logger := log.WithFields(log.Fields{
"room_id": event.RoomID,
"service_user_id": client.UserID,
"inviter": event.Sender,
})
logger.Print("Accepting invite from user")
if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil {
logger.WithError(err).Print("Failed to join room")
} else {
logger.Print("Joined room")
}
}
}
func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
homeserverURL, err := url.Parse(config.HomeserverURL) homeserverURL, err := url.Parse(config.HomeserverURL)
if err != nil { if err != nil {
@ -149,73 +214,29 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID)
client.OnSaveNextBatch(func(nextBatch string) {
if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"next_batch": nextBatch,
}).Error("Failed to persist next_batch token")
}
})
// TODO: Check that the access token is valid for the userID by peforming // TODO: Check that the access token is valid for the userID by peforming
// a request against the server. // a request against the server.
client.Worker.OnEventType("m.room.message", func(event *matrix.Event) { client.Worker.OnEventType("m.room.message", func(event *matrix.Event) {
services, err := c.db.LoadServicesForUser(client.UserID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"room_id": event.RoomID,
"service_user_id": client.UserID,
}).Warn("Error loading services")
}
var plugins []plugin.Plugin
for _, service := range services {
plugins = append(plugins, service.Plugin(client, event.RoomID))
}
plugin.OnMessage(plugins, client, event)
c.onMessageEvent(client, event)
}) })
client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) { client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) {
// see if these options are for us. The state key is the user ID with a leading _
// to get around restrictions in the HS about having user IDs as state keys.
targetUserID := strings.TrimPrefix(event.StateKey, "_")
if targetUserID != client.UserID {
return
}
// these options fully clobber what was there previously.
opts := types.BotOptions{
UserID: client.UserID,
RoomID: event.RoomID,
SetByUserID: event.Sender,
Options: event.Content,
}
if _, err := c.db.StoreBotOptions(opts); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"room_id": event.RoomID,
"bot_user_id": client.UserID,
"set_by_user_id": event.Sender,
}).Error("Failed to persist bot options")
}
c.onBotOptionsEvent(client, event)
}) })
if config.AutoJoinRooms { if config.AutoJoinRooms {
client.Worker.OnEventType("m.room.member", func(event *matrix.Event) { client.Worker.OnEventType("m.room.member", func(event *matrix.Event) {
if event.StateKey != config.UserID {
return // not our member event
}
m := event.Content["membership"]
membership, ok := m.(string)
if !ok {
return
}
if membership == "invite" {
logger := log.WithFields(log.Fields{
"room_id": event.RoomID,
"service_user_id": config.UserID,
"inviter": event.Sender,
})
logger.Print("Accepting invite from user")
if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil {
logger.WithError(err).Print("Failed to join room")
} else {
logger.Print("Joined room")
}
}
c.onRoomMemberEvent(client, event)
}) })
} }

8
src/github.com/matrix-org/go-neb/database/db.go

@ -77,6 +77,14 @@ func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientCo
return return
} }
// UpdateNextBatch updates the next_batch token for the given user.
func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
return updateNextBatchTxn(txn, userID, nextBatch)
})
return
}
// LoadService loads a service from the database. // LoadService loads a service from the database.
// Returns sql.ErrNoRows if the service isn't in the database. // Returns sql.ErrNoRows if the service isn't in the database.
func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) {

9
src/github.com/matrix-org/go-neb/database/schema.go

@ -129,6 +129,15 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client
return err return err
} }
const updateNextBatchSQL = `
UPDATE matrix_clients SET next_batch = $1 WHERE user_id = $2
`
func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error {
_, err := txn.Exec(updateNextBatchSQL, nextBatch, userID)
return err
}
const selectServiceSQL = ` const selectServiceSQL = `
SELECT service_type, service_user_id, service_json FROM services SELECT service_type, service_user_id, service_json FROM services
WHERE service_id = $1 WHERE service_id = $1

43
src/github.com/matrix-org/go-neb/matrix/matrix.go

@ -28,21 +28,22 @@ import (
) )
var ( var (
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":0}}}`)
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
) )
// Client represents a Matrix client. // Client represents a Matrix client.
type Client struct { type Client struct {
HomeserverURL *url.URL
Prefix string
UserID string
AccessToken string
Rooms map[string]*Room
Worker *Worker
syncingMutex sync.Mutex
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
httpClient *http.Client
filterID string
HomeserverURL *url.URL
Prefix string
UserID string
AccessToken string
Rooms map[string]*Room
Worker *Worker
syncingMutex sync.Mutex
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
httpClient *http.Client
filterID string
nextBatchSaver func(string)
} }
func (cli *Client) buildURL(urlPath ...string) string { func (cli *Client) buildURL(urlPath ...string) string {
@ -239,13 +240,30 @@ func (cli *Client) Sync() {
return return
} }
isFirstSync := nextToken == ""
// Update client state // Update client state
nextToken = syncResponse.NextBatch nextToken = syncResponse.NextBatch
logger.WithField("next_batch", nextToken).Print("Received sync response") logger.WithField("next_batch", nextToken).Print("Received sync response")
channel <- syncResponse
// Save the token now *before* passing it through to the worker. This means it's possible
// to not process some events, but it means that we won't get constantly stuck processing
// a malformed/buggy event which keeps making us panic.
if cli.nextBatchSaver != nil {
cli.nextBatchSaver(nextToken)
}
if !isFirstSync {
channel <- syncResponse
}
} }
} }
// OnSaveNextBatch is a function which can save the next_batch token passed to it
func (cli *Client) OnSaveNextBatch(fn func(string)) {
cli.nextBatchSaver = fn
}
func (cli *Client) incrementSyncingID() uint32 { func (cli *Client) incrementSyncingID() uint32 {
cli.syncingMutex.Lock() cli.syncingMutex.Lock()
defer cli.syncingMutex.Unlock() defer cli.syncingMutex.Unlock()
@ -344,6 +362,7 @@ func (cli *Client) doSync(timeout int, since string) ([]byte, error) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"since": since, "since": since,
"timeout": timeout, "timeout": timeout,
"user_id": cli.UserID,
}).Print("Syncing") }).Print("Syncing")
res, err := http.Get(urlPath) res, err := http.Get(urlPath)
if err != nil { if err != nil {

Loading…
Cancel
Save