Browse Source

Merge pull request #60 from matrix-org/kegan/persist-next-batch

Persist next_batch tokens
kegan/ignore-events-on-join
Kegsay 8 years ago
committed by GitHub
parent
commit
2bb04b3b6d
  1. 76
      src/github.com/matrix-org/go-neb/clients/clients.go
  2. 17
      src/github.com/matrix-org/go-neb/database/db.go
  3. 22
      src/github.com/matrix-org/go-neb/database/schema.go
  4. 40
      src/github.com/matrix-org/go-neb/matrix/matrix.go

76
src/github.com/matrix-org/go-neb/clients/clients.go

@ -11,6 +11,31 @@ import (
"sync" "sync"
) )
type nextBatchStore struct {
db *database.ServiceDB
}
func (s nextBatchStore) Save(userID, nextBatch string) {
if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"user_id": userID,
"next_batch": nextBatch,
}).Error("Failed to persist next_batch token")
}
}
func (s nextBatchStore) Load(userID string) string {
token, err := s.db.LoadNextBatch(userID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"user_id": userID,
}).Error("Failed to load next_batch token")
return ""
}
return token
}
// A Clients is a collection of clients used for bot services. // A Clients is a collection of clients used for bot services.
type Clients struct { type Clients struct {
db *database.ServiceDB db *database.ServiceDB
@ -141,18 +166,7 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr
return return
} }
func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
homeserverURL, err := url.Parse(config.HomeserverURL)
if err != nil {
return nil, err
}
client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID)
// TODO: Check that the access token is valid for the userID by peforming
// a request against the server.
client.Worker.OnEventType("m.room.message", func(event *matrix.Event) {
func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) {
services, err := c.db.LoadServicesForUser(client.UserID) services, err := c.db.LoadServicesForUser(client.UserID)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -166,9 +180,9 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
plugins = append(plugins, service.Plugin(client, event.RoomID)) plugins = append(plugins, service.Plugin(client, event.RoomID))
} }
plugin.OnMessage(plugins, client, event) plugin.OnMessage(plugins, client, event)
})
}
client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) {
func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) {
// see if these options are for us. The state key is the user ID with a leading _ // see if these options are for us. The state key is the user ID with a leading _
// to get around restrictions in the HS about having user IDs as state keys. // to get around restrictions in the HS about having user IDs as state keys.
targetUserID := strings.TrimPrefix(event.StateKey, "_") targetUserID := strings.TrimPrefix(event.StateKey, "_")
@ -190,11 +204,10 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
"set_by_user_id": event.Sender, "set_by_user_id": event.Sender,
}).Error("Failed to persist bot options") }).Error("Failed to persist bot options")
} }
})
}
if config.AutoJoinRooms {
client.Worker.OnEventType("m.room.member", func(event *matrix.Event) {
if event.StateKey != config.UserID {
func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) {
if event.StateKey != client.UserID {
return // not our member event return // not our member event
} }
m := event.Content["membership"] m := event.Content["membership"]
@ -205,7 +218,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
if membership == "invite" { if membership == "invite" {
logger := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{
"room_id": event.RoomID, "room_id": event.RoomID,
"service_user_id": config.UserID,
"service_user_id": client.UserID,
"inviter": event.Sender, "inviter": event.Sender,
}) })
logger.Print("Accepting invite from user") logger.Print("Accepting invite from user")
@ -216,6 +229,31 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
logger.Print("Joined room") logger.Print("Joined room")
} }
} }
}
func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
homeserverURL, err := url.Parse(config.HomeserverURL)
if err != nil {
return nil, err
}
client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID)
client.NextBatchStorer = nextBatchStore{c.db}
// TODO: Check that the access token is valid for the userID by peforming
// a request against the server.
client.Worker.OnEventType("m.room.message", func(event *matrix.Event) {
c.onMessageEvent(client, event)
})
client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) {
c.onBotOptionsEvent(client, event)
})
if config.AutoJoinRooms {
client.Worker.OnEventType("m.room.member", func(event *matrix.Event) {
c.onRoomMemberEvent(client, event)
}) })
} }

17
src/github.com/matrix-org/go-neb/database/db.go

@ -77,6 +77,23 @@ func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientCo
return return
} }
// UpdateNextBatch updates the next_batch token for the given user.
func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
return updateNextBatchTxn(txn, userID, nextBatch)
})
return
}
// LoadNextBatch loads the next_batch token for the given user.
func (d *ServiceDB) LoadNextBatch(userID string) (nextBatch string, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
nextBatch, err = selectNextBatchTxn(txn, userID)
return err
})
return
}
// LoadService loads a service from the database. // LoadService loads a service from the database.
// Returns sql.ErrNoRows if the service isn't in the database. // Returns sql.ErrNoRows if the service isn't in the database.
func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) {

22
src/github.com/matrix-org/go-neb/database/schema.go

@ -129,6 +129,28 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client
return err return err
} }
const updateNextBatchSQL = `
UPDATE matrix_clients SET next_batch = $1 WHERE user_id = $2
`
func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error {
_, err := txn.Exec(updateNextBatchSQL, nextBatch, userID)
return err
}
const selectNextBatchSQL = `
SELECT next_batch FROM matrix_clients WHERE user_id = $1
`
func selectNextBatchTxn(txn *sql.Tx, userID string) (string, error) {
var nextBatch string
row := txn.QueryRow(selectNextBatchSQL, userID)
if err := row.Scan(&nextBatch); err != nil {
return "", err
}
return nextBatch, nil
}
const selectServiceSQL = ` const selectServiceSQL = `
SELECT service_type, service_user_id, service_json FROM services SELECT service_type, service_user_id, service_json FROM services
WHERE service_id = $1 WHERE service_id = $1

40
src/github.com/matrix-org/go-neb/matrix/matrix.go

@ -28,9 +28,23 @@ import (
) )
var ( var (
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":0}}}`)
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
) )
// NextBatchStorer controls loading/saving of next_batch tokens for users
type NextBatchStorer interface {
// Save a next_batch token for a given user. Best effort.
Save(userID, nextBatch string)
// Load a next_batch token for a given user. Return an empty string if no token exists.
Load(userID string) string
}
// noopNextBatchStore does not load or save next_batch tokens.
type noopNextBatchStore struct{}
func (s noopNextBatchStore) Save(userID, nextBatch string) {}
func (s noopNextBatchStore) Load(userID string) string { return "" }
// Client represents a Matrix client. // Client represents a Matrix client.
type Client struct { type Client struct {
HomeserverURL *url.URL HomeserverURL *url.URL
@ -43,6 +57,7 @@ type Client struct {
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
httpClient *http.Client httpClient *http.Client
filterID string filterID string
NextBatchStorer NextBatchStorer
} }
func (cli *Client) buildURL(urlPath ...string) string { func (cli *Client) buildURL(urlPath ...string) string {
@ -191,7 +206,7 @@ func (cli *Client) Sync() {
"user_id": cli.UserID, "user_id": cli.UserID,
}) })
// TODO: Store the filter ID and sync token in the database
// TODO: Store the filter ID in the database
filterID, err := cli.createFilter() filterID, err := cli.createFilter()
if err != nil { if err != nil {
logger.WithError(err).Fatal("Failed to create filter") logger.WithError(err).Fatal("Failed to create filter")
@ -199,9 +214,9 @@ func (cli *Client) Sync() {
} }
cli.filterID = filterID cli.filterID = filterID
logger.WithField("filter", filterID).Print("Got filter ID") logger.WithField("filter", filterID).Print("Got filter ID")
nextToken := ""
nextToken := cli.NextBatchStorer.Load(cli.UserID)
logger.Print("Starting sync")
logger.WithField("next_batch", nextToken).Print("Starting sync")
channel := make(chan syncHTTPResponse, 5) channel := make(chan syncHTTPResponse, 5)
@ -232,18 +247,26 @@ func (cli *Client) Sync() {
// Check that the syncing state hasn't changed // Check that the syncing state hasn't changed
// Either because we've stopped syncing or another sync has been started. // Either because we've stopped syncing or another sync has been started.
// We discard the response from our sync. // We discard the response from our sync.
// TODO: Store the next_batch token so that the next sync can resume
// from where this sync left off.
if cli.getSyncingID() != syncingID { if cli.getSyncingID() != syncingID {
logger.Print("Stopping sync") logger.Print("Stopping sync")
return return
} }
isFirstSync := nextToken == ""
// Update client state // Update client state
nextToken = syncResponse.NextBatch nextToken = syncResponse.NextBatch
logger.WithField("next_batch", nextToken).Print("Received sync response") logger.WithField("next_batch", nextToken).Print("Received sync response")
// Save the token now *before* passing it through to the worker. This means it's possible
// to not process some events, but it means that we won't get constantly stuck processing
// a malformed/buggy event which keeps making us panic.
cli.NextBatchStorer.Save(cli.UserID, nextToken)
if !isFirstSync {
channel <- syncResponse channel <- syncResponse
} }
}
} }
func (cli *Client) incrementSyncingID() uint32 { func (cli *Client) incrementSyncingID() uint32 {
@ -344,6 +367,7 @@ func (cli *Client) doSync(timeout int, since string) ([]byte, error) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"since": since, "since": since,
"timeout": timeout, "timeout": timeout,
"user_id": cli.UserID,
}).Print("Syncing") }).Print("Syncing")
res, err := http.Get(urlPath) res, err := http.Get(urlPath)
if err != nil { if err != nil {
@ -366,6 +390,10 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien
Prefix: "/_matrix/client/r0", Prefix: "/_matrix/client/r0",
} }
cli.Worker = newWorker(&cli) cli.Worker = newWorker(&cli)
// By default, use a no-op next_batch storer which will never save tokens and always
// "load" the empty string as a token. The client will work with this storer: it just won't
// remember the token across restarts. In practice, a database backend should be used.
cli.NextBatchStorer = noopNextBatchStore{}
cli.Rooms = make(map[string]*Room) cli.Rooms = make(map[string]*Room)
cli.httpClient = &http.Client{} cli.httpClient = &http.Client{}

Loading…
Cancel
Save